more code

This commit is contained in:
Hongze Cheng 2024-11-27 17:53:53 +08:00
parent 09b0fb20ca
commit 3ae5a72a4c
6 changed files with 228 additions and 26 deletions

View File

@ -171,6 +171,8 @@ typedef union {
typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param); typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param);
struct SFileSetReader;
typedef struct TsdReader { typedef struct TsdReader {
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables); SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables);
@ -193,9 +195,10 @@ typedef struct TsdReader {
void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
// for fileset query // for fileset query
void *(*openFilesetReadCursor)(void *pVnode); int32_t (*fileSetReaderOpen)(void *pVnode, struct SFileSetReader **ppReader);
void *(*nextFilesetReadCursor)(void *cursor); int32_t (*fileSetReadNext)(struct SFileSetReader *);
void (*closeFilesetReadCursor)(void *pReader); int32_t (*fileSetGetEntryField)(struct SFileSetReader *, const char *, void *);
void (*fileSetReaderClose)(struct SFileSetReader **);
} TsdReader; } TsdReader;

View File

@ -425,7 +425,7 @@ static const SSysDbTableSchema filesetsFullSchema[] = {
{.name = "end_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "end_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "last_compact", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "last_compact", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "shold_compact", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false}, {.name = "shold_compact", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false},
{.name = "details", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "details", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };
static const SSysDbTableSchema tsmaSchema[] = { static const SSysDbTableSchema tsmaSchema[] = {

View File

@ -339,6 +339,12 @@ struct SVnodeCfg {
#define TABLE_IS_COL_COMPRESSED(FLG) (((FLG) & (TABLE_COL_COMPRESSED)) != 0) #define TABLE_IS_COL_COMPRESSED(FLG) (((FLG) & (TABLE_COL_COMPRESSED)) != 0)
#define TABLE_SET_COL_COMPRESSED(FLG) ((FLG) |= TABLE_COL_COMPRESSED) #define TABLE_SET_COL_COMPRESSED(FLG) ((FLG) |= TABLE_COL_COMPRESSED)
struct SFileSetReader;
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader);
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader);
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value);
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1239,3 +1239,118 @@ void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
} }
} }
} }
struct SFileSetReader {
STsdb *pTsdb;
int32_t fid;
int64_t startTime;
int64_t endTime;
STFileSet *pFileSet;
};
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
if (pVnode == NULL || ppReader == NULL) {
return TSDB_CODE_INVALID_PARA;
}
STsdb *pTsdb = ((SVnode *)pVnode)->pTsdb;
(*ppReader) = taosMemoryCalloc(1, sizeof(struct SFileSetReader));
if (*ppReader == NULL) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, __LINE__,
tstrerror(terrno));
return terrno;
}
(*ppReader)->pTsdb = pTsdb;
(*ppReader)->fid = INT32_MIN;
(*ppReader)->pFileSet = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
STsdb *pTsdb = pReader->pTsdb;
tsdbTFileSetClear(&pReader->pFileSet);
STFileSet *fset = &(STFileSet){
.fid = pReader->fid,
};
STFileSet **fsetPtr = TARRAY2_SEARCH(pReader->pTsdb->pFS->fSetArr, &fset, tsdbTFileSetCmprFn, TD_GT);
if (fsetPtr == NULL) {
pReader->fid = INT32_MAX;
return TSDB_CODE_NOT_FOUND;
}
pReader->fid = (*fsetPtr)->fid;
tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
return tsdbTFileSetInitRef(pReader->pTsdb, *fsetPtr, &pReader->pFileSet);
}
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader) {
int32_t code = TSDB_CODE_SUCCESS;
(void)taosThreadMutexLock(&pReader->pTsdb->mutex);
code = tsdbFileSetReaderNextNoLock(pReader);
(void)taosThreadMutexUnlock(&pReader->pTsdb->mutex);
return code;
}
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value) {
const char *fieldName;
if (pReader->fid == INT32_MIN || pReader->fid == INT32_MAX) {
return TSDB_CODE_INVALID_PARA;
}
fieldName = "fileset_id";
if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
*(int32_t *)value = pReader->fid;
return TSDB_CODE_SUCCESS;
}
fieldName = "start_time";
if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
*(int64_t *)value = pReader->startTime;
return TSDB_CODE_SUCCESS;
}
fieldName = "end_time";
if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
*(int64_t *)value = pReader->endTime;
return TSDB_CODE_SUCCESS;
}
fieldName = "last_compact_time";
if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
*(int64_t *)value = 0; // TODO
return TSDB_CODE_SUCCESS;
}
fieldName = "should_compact";
if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
*(char *)value = 0; // TODO
return TSDB_CODE_SUCCESS;
}
fieldName = "details";
if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
// TODO
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_INVALID_PARA;
}
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
if (ppReader == NULL || *ppReader == NULL) {
return;
}
tsdbTFileSetClear(&(*ppReader)->pFileSet);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return;
}

View File

@ -63,6 +63,12 @@ void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited; pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited;
pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb;
// file set iterate
pReader->fileSetReaderOpen = tsdbFileSetReaderOpen;
pReader->fileSetReadNext = tsdbFileSetReaderNext;
pReader->fileSetGetEntryField = tsdbFileSetGetEntryField;
pReader->fileSetReaderClose = tsdbFileSetReaderClose;
} }
void initMetadataAPI(SStoreMeta* pMeta) { void initMetadataAPI(SStoreMeta* pMeta) {

View File

@ -73,6 +73,9 @@ typedef struct SSysTableScanInfo {
SLimitInfo limitInfo; SLimitInfo limitInfo;
int32_t tbnameSlotId; int32_t tbnameSlotId;
SStorageAPI* pAPI; SStorageAPI* pAPI;
// file set iterate
struct SFileSetReader* pFileSetReader;
} SSysTableScanInfo; } SSysTableScanInfo;
typedef struct { typedef struct {
@ -2008,10 +2011,9 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) {
SSDataBlock* p = NULL; SSDataBlock* p = NULL;
// open cursor if not opened // open cursor if not opened
// TODO: call corresponding api to open the cursor if (pInfo->pFileSetReader == NULL) {
if (pInfo->pCur == NULL) { code = pAPI->tsdReader.fileSetReaderOpen(pInfo->readHandle.vnode, &pInfo->pFileSetReader);
// pInfo->pCur = pAPI->tsdReader.openFileSetCursor(pInfo->readHandle.vnode); QUERY_CHECK_CODE(code, lino, _end);
// QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno);
} }
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
@ -2042,14 +2044,96 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) {
// loop to query each entry // loop to query each entry
for (;;) { for (;;) {
void* entry = pAPI->tsdReader.nextFilesetReadCursor(pInfo->pCur); int32_t ret = pAPI->tsdReader.fileSetReadNext(pInfo->pFileSetReader);
if (entry == NULL) { if (ret) {
break; if (ret == TSDB_CODE_NOT_FOUND) {
// no more scan entry
break;
} else {
code = ret;
QUERY_CHECK_CODE(code, lino, _end);
}
} }
code = doSetQueryFileSetRow(); // fill the data block
QUERY_CHECK_CODE(code, lino, _end); {
SColumnInfoData* pColInfoData;
// dnode_id
int32_t dnodeId = 0; // TODO
pColInfoData = taosArrayGet(p->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&dnodeId, false);
QUERY_CHECK_CODE(code, lino, _end);
// db_name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, db, false);
QUERY_CHECK_CODE(code, lino, _end);
// vgroup_id
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
QUERY_CHECK_CODE(code, lino, _end);
// fileset_id
int32_t filesetId = 0;
code = pAPI->tsdReader.fileSetGetEntryField(pInfo->pFileSetReader, "fileset_id", &filesetId);
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&filesetId, false);
QUERY_CHECK_CODE(code, lino, _end);
// start_time
int64_t startTime = 0;
code = pAPI->tsdReader.fileSetGetEntryField(pInfo->pFileSetReader, "start_time", &startTime);
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&startTime, false);
QUERY_CHECK_CODE(code, lino, _end);
// end_time
int64_t endTime = 0;
code = pAPI->tsdReader.fileSetGetEntryField(pInfo->pFileSetReader, "end_time", &endTime);
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&endTime, false);
QUERY_CHECK_CODE(code, lino, _end);
// last_compact
int64_t lastCompacat = 0;
code = pAPI->tsdReader.fileSetGetEntryField(pInfo->pFileSetReader, "last_compact_time", &lastCompacat);
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(p->pDataBlock, 6);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&lastCompacat, false);
QUERY_CHECK_CODE(code, lino, _end);
// shold_compact
bool shouldCompact = false;
code = pAPI->tsdReader.fileSetGetEntryField(pInfo->pFileSetReader, "should_compact", &shouldCompact);
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&shouldCompact, false);
QUERY_CHECK_CODE(code, lino, _end);
// // details
// const char* details = NULL;
// code = pAPI->tsdReader.fileSetGetEntryField(pInfo->pFileSetReader, "details", &details);
// QUERY_CHECK_CODE(code, lino, _end);
// pColInfoData = taosArrayGet(p->pDataBlock, 8);
// QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
// code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
// QUERY_CHECK_CODE(code, lino, _end);
}
// check capacity
if (++numOfRows >= pOperator->resultInfo.capacity) { if (++numOfRows >= pOperator->resultInfo.capacity) {
p->info.rows = numOfRows; p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
@ -2064,15 +2148,12 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) {
numOfRows = 0; numOfRows = 0;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
break; break;
} }
} }
} }
#if 0
if (numOfRows > 0) { if (numOfRows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
p->info.rows = numOfRows; p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
@ -2089,23 +2170,14 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) {
blockDataDestroy(p); blockDataDestroy(p);
p = NULL; p = NULL;
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
pInfo->pCur = NULL;
setOperatorCompleted(pOperator);
}
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
#endif
_end: _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
blockDataDestroy(p); blockDataDestroy(p);
pTaskInfo->code = code; pTaskInfo->code = code;
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pAPI->tsdReader.fileSetReaderClose(&pInfo->pFileSetReader);
pInfo->pCur = NULL;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;