diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 13b247770e..d156057459 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG 213f8b3 + GIT_TAG 3e08996 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 588debbe52..a2212292a7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -573,6 +573,68 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) return pResBlock; } +static int32_t tsdbInitReaderLock(STsdbReader* pReader) { + int32_t code = -1; + qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + code = taosThreadMutexInit(&pReader->readerMutex, NULL); + + qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + return code; +} + +static int32_t tsdbUninitReaderLock(STsdbReader* pReader) { + int32_t code = -1; + qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + code = taosThreadMutexDestroy(&pReader->readerMutex); + + qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + return code; +} + +static int32_t tsdbAcquireReader(STsdbReader* pReader) { + int32_t code = -1; + qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + code = taosThreadMutexLock(&pReader->readerMutex); + + qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + return code; +} + +static int32_t tsdbTryAcquireReader(STsdbReader* pReader) { + int32_t code = -1; + qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + code = taosThreadMutexTryLock(&pReader->readerMutex); + + qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + return code; +} + +static int32_t tsdbReleaseReader(STsdbReader* pReader) { + int32_t code = -1; + qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + code = taosThreadMutexUnlock(&pReader->readerMutex); + + qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + + return code; +} + +void tsdbReleaseDataBlock(STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + if (!pStatus->composedDataBlock) { + tsdbReleaseReader(pReader); + } +} + static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, SSDataBlock* pResBlock, const char* idstr) { int32_t code = 0; @@ -636,7 +698,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols); - taosThreadMutexInit(&pReader->readerMutex, NULL); + tsdbInitReaderLock(pReader); *ppReader = pReader; return code; @@ -4016,8 +4078,9 @@ void tsdbReaderClose(STsdbReader* pReader) { qTrace("tsdb/reader: %p, untake snapshot", pReader); tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true); + pReader->pReadSnap = NULL; - taosThreadMutexDestroy(&pReader->readerMutex); + tsdbUninitReaderLock(pReader); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SIOCostSummary* pCost = &pReader->cost; @@ -4164,16 +4227,16 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { int32_t code = 0; STsdbReader* pReader = pQHandle; - code = taosThreadMutexTryLock(&pReader->readerMutex); + code = tsdbTryAcquireReader(pReader); if (code == 0) { if (pReader->suspended) { - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return code; } tsdbReaderSuspend(pReader); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return code; } else if (code == EBUSY) { @@ -4274,8 +4337,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; - qTrace("tsdb/read: %p, take read mutex", pReader); - taosThreadMutexLock(&pReader->readerMutex); + int32_t code = tsdbAcquireReader(pReader); + qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); + if (pReader->suspended) { tsdbReaderResume(pReader); } @@ -4287,7 +4351,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { pStatus = &pReader->innerReader[0]->status; if (pStatus->composedDataBlock) { qTrace("tsdb/read: %p, unlock read mutex", pReader); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); } return ret; @@ -4310,7 +4374,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (ret) { if (pStatus->composedDataBlock) { qTrace("tsdb/read: %p, unlock read mutex", pReader); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); } return ret; @@ -4330,7 +4394,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { pStatus = &pReader->innerReader[1]->status; if (pStatus->composedDataBlock) { qTrace("tsdb/read: %p, unlock read mutex", pReader); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); } return ret1; @@ -4338,7 +4402,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } qTrace("tsdb/read: %p, unlock read mutex", pReader); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return false; } @@ -4497,13 +4561,6 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { return pReader->pResBlock; } -void tsdbReleaseDataBlock(STsdbReader* pReader) { - // SReaderStatus* pStatus = &pReader->status; - // if (!pStatus->composedDataBlock) { - taosThreadMutexUnlock(&pReader->readerMutex); - //} -} - SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { STsdbReader* pTReader = pReader; if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { @@ -4522,7 +4579,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { SSDataBlock* ret = doRetrieveDataBlock(pTReader); qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return ret; } @@ -4531,7 +4588,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { SReaderStatus* pStatus = &pReader->status; qTrace("tsdb/reader-reset: %p, take read mutex", pReader); - taosThreadMutexLock(&pReader->readerMutex); + tsdbAcquireReader(pReader); if (pReader->suspended) { tsdbReaderResume(pReader); @@ -4540,7 +4597,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return TSDB_CODE_SUCCESS; } @@ -4578,7 +4635,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return code; } @@ -4589,7 +4646,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr); - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return code; } @@ -4674,7 +4731,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { int64_t rows = 0; SReaderStatus* pStatus = &pReader->status; - taosThreadMutexLock(&pReader->readerMutex); + tsdbAcquireReader(pReader); if (pReader->suspended) { tsdbReaderResume(pReader); } @@ -4704,7 +4761,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); } - taosThreadMutexUnlock(&pReader->readerMutex); + tsdbReleaseReader(pReader); return rows; }