Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/vnode_compact
This commit is contained in:
commit
03fa4578da
|
@ -2,7 +2,7 @@
|
||||||
# taosadapter
|
# taosadapter
|
||||||
ExternalProject_Add(taosadapter
|
ExternalProject_Add(taosadapter
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||||
GIT_TAG 213f8b3
|
GIT_TAG 3e08996
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -573,6 +573,68 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
|
||||||
return pResBlock;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
|
||||||
|
int32_t code = -1;
|
||||||
|
qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
code = taosThreadMutexInit(&pReader->readerMutex, NULL);
|
||||||
|
|
||||||
|
qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
|
||||||
|
int32_t code = -1;
|
||||||
|
qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
code = taosThreadMutexDestroy(&pReader->readerMutex);
|
||||||
|
|
||||||
|
qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbAcquireReader(STsdbReader* pReader) {
|
||||||
|
int32_t code = -1;
|
||||||
|
qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
code = taosThreadMutexLock(&pReader->readerMutex);
|
||||||
|
|
||||||
|
qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
|
||||||
|
int32_t code = -1;
|
||||||
|
qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
code = taosThreadMutexTryLock(&pReader->readerMutex);
|
||||||
|
|
||||||
|
qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbReleaseReader(STsdbReader* pReader) {
|
||||||
|
int32_t code = -1;
|
||||||
|
qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
code = taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
|
||||||
|
qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbReleaseDataBlock(STsdbReader* pReader) {
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
if (!pStatus->composedDataBlock) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
|
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
|
||||||
SSDataBlock* pResBlock, const char* idstr) {
|
SSDataBlock* pResBlock, const char* idstr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -636,7 +698,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
|
|
||||||
setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
||||||
|
|
||||||
taosThreadMutexInit(&pReader->readerMutex, NULL);
|
tsdbInitReaderLock(pReader);
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return code;
|
return code;
|
||||||
|
@ -4016,8 +4078,9 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
qTrace("tsdb/reader: %p, untake snapshot", pReader);
|
qTrace("tsdb/reader: %p, untake snapshot", pReader);
|
||||||
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
|
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
|
||||||
|
pReader->pReadSnap = NULL;
|
||||||
|
|
||||||
taosThreadMutexDestroy(&pReader->readerMutex);
|
tsdbUninitReaderLock(pReader);
|
||||||
|
|
||||||
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
||||||
SIOCostSummary* pCost = &pReader->cost;
|
SIOCostSummary* pCost = &pReader->cost;
|
||||||
|
@ -4164,16 +4227,16 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdbReader* pReader = pQHandle;
|
STsdbReader* pReader = pQHandle;
|
||||||
|
|
||||||
code = taosThreadMutexTryLock(&pReader->readerMutex);
|
code = tsdbTryAcquireReader(pReader);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (pReader->suspended) {
|
if (pReader->suspended) {
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderSuspend(pReader);
|
tsdbReaderSuspend(pReader);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
} else if (code == EBUSY) {
|
} else if (code == EBUSY) {
|
||||||
|
@ -4274,8 +4337,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
qTrace("tsdb/read: %p, take read mutex", pReader);
|
int32_t code = tsdbAcquireReader(pReader);
|
||||||
taosThreadMutexLock(&pReader->readerMutex);
|
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
||||||
|
|
||||||
if (pReader->suspended) {
|
if (pReader->suspended) {
|
||||||
tsdbReaderResume(pReader);
|
tsdbReaderResume(pReader);
|
||||||
}
|
}
|
||||||
|
@ -4287,7 +4351,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
pStatus = &pReader->innerReader[0]->status;
|
pStatus = &pReader->innerReader[0]->status;
|
||||||
if (pStatus->composedDataBlock) {
|
if (pStatus->composedDataBlock) {
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -4310,7 +4374,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
if (ret) {
|
if (ret) {
|
||||||
if (pStatus->composedDataBlock) {
|
if (pStatus->composedDataBlock) {
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -4330,7 +4394,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
pStatus = &pReader->innerReader[1]->status;
|
pStatus = &pReader->innerReader[1]->status;
|
||||||
if (pStatus->composedDataBlock) {
|
if (pStatus->composedDataBlock) {
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret1;
|
return ret1;
|
||||||
|
@ -4338,7 +4402,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -4497,13 +4561,6 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
||||||
return pReader->pResBlock;
|
return pReader->pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbReleaseDataBlock(STsdbReader* pReader) {
|
|
||||||
// SReaderStatus* pStatus = &pReader->status;
|
|
||||||
// if (!pStatus->composedDataBlock) {
|
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
|
||||||
//}
|
|
||||||
}
|
|
||||||
|
|
||||||
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
STsdbReader* pTReader = pReader;
|
STsdbReader* pTReader = pReader;
|
||||||
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
|
@ -4522,7 +4579,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
|
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
|
||||||
|
|
||||||
qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -4531,7 +4588,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
|
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
|
||||||
taosThreadMutexLock(&pReader->readerMutex);
|
tsdbAcquireReader(pReader);
|
||||||
|
|
||||||
if (pReader->suspended) {
|
if (pReader->suspended) {
|
||||||
tsdbReaderResume(pReader);
|
tsdbReaderResume(pReader);
|
||||||
|
@ -4540,7 +4597,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
||||||
tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
|
tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -4578,7 +4635,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
|
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
|
||||||
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4589,7 +4646,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
|
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4674,7 +4731,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
int64_t rows = 0;
|
int64_t rows = 0;
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
taosThreadMutexLock(&pReader->readerMutex);
|
tsdbAcquireReader(pReader);
|
||||||
if (pReader->suspended) {
|
if (pReader->suspended) {
|
||||||
tsdbReaderResume(pReader);
|
tsdbReaderResume(pReader);
|
||||||
}
|
}
|
||||||
|
@ -4704,7 +4761,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return rows;
|
return rows;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue