refact more

This commit is contained in:
Hongze Cheng 2022-04-25 12:23:00 +00:00
parent bb8384e82c
commit 883a65f63b
4 changed files with 91 additions and 101 deletions

View File

@ -114,8 +114,6 @@ int metaDropTable(SMeta* pMeta, tb_uid_t uid);
int metaCommit(SMeta* pMeta); int metaCommit(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid);
STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode); void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);

View File

@ -132,39 +132,43 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
return 0; return 0;
} }
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
#if 0 void *pKey = NULL;
int ret; void *pVal = NULL;
SMetaDB *pMetaDb = pMeta->pDB; int kLen = 0;
void *pKey; int vLen = 0;
void *pVal; int ret;
int kLen; SSkmDbKey skmDbKey;
int vLen; SSchemaWrapper *pSW = NULL;
STbCfg *pTbCfg; SSchema *pSchema = NULL;
void *pBuf;
SCoder coder = {0};
// Fetch // fetch
pKey = &uid; skmDbKey.uid = uid;
kLen = sizeof(uid); skmDbKey.sver = sver;
pVal = NULL; pKey = &skmDbKey;
ret = tdbDbGet(pMetaDb->pTbDB, pKey, kLen, &pVal, &vLen); kLen = sizeof(skmDbKey);
ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
if (ret < 0) { if (ret < 0) {
return NULL; return NULL;
} }
// Decode // decode
pTbCfg = taosMemoryMalloc(sizeof(*pTbCfg)); pBuf = pVal;
metaDecodeTbInfo(pVal, pTbCfg); pSW = taosMemoryMalloc(sizeof(pSW));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER);
tDecodeSSchemaWrapper(&coder, pSW);
pSchema = taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
memcpy(pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
tCoderClear(&coder);
pSW->pSchema = pSchema;
TDB_FREE(pVal); TDB_FREE(pVal);
return pTbCfg; return pSW;
#endif
return NULL;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
// return metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false);
return NULL;
} }
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
@ -216,26 +220,25 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
} }
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
#if 0
tb_uid_t quid; tb_uid_t quid;
SSchemaWrapper *pSW; SMetaReader mr = {0};
STSchemaBuilder sb; STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0};
SSchema *pSchema; SSchema *pSchema;
STSchema *pTSchema;
STbCfg *pTbCfg;
pTbCfg = metaGetTbInfoByUid(pMeta, uid); metaReaderInit(&mr, pMeta->pVnode, 0);
if (pTbCfg->type == META_CHILD_TABLE) { metaGetTableEntryByUid(&mr, uid);
quid = pTbCfg->ctbCfg.suid;
if (mr.me.type == TSDB_CHILD_TABLE) {
quid = mr.me.ctbEntry.suid;
} else { } else {
quid = uid; quid = uid;
} }
pSW = metaGetTableSchemaImpl(pMeta, quid, sver, true, true); metaReaderClear(&mr);
if (pSW == NULL) {
return NULL;
}
pSW = metaGetTableSchema(pMeta, quid, sver, 0);
tdInitTSchemaBuilder(&sb, 0); tdInitTSchemaBuilder(&sb, 0);
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchema + i;
@ -244,9 +247,9 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
pTSchema = tdGetSchemaFromBuilder(&sb); pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb); tdDestroyTSchemaBuilder(&sb);
taosMemoryFree(pSW->pSchema);
taosMemoryFree(pSW);
return pTSchema; return pTSchema;
#endif
return NULL;
} }
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
@ -314,36 +317,6 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
return NULL; return NULL;
} }
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
#if 0
void *pKey;
void *pVal;
void *ppKey;
int pkLen;
int kLen;
int vLen;
int ret;
pKey = tbname;
kLen = strlen(tbname) + 1;
pVal = NULL;
ppKey = NULL;
ret = tdbDbPGet(pMeta->pDB->pNameIdx, pKey, kLen, &ppKey, &pkLen, &pVal, &vLen);
if (ret < 0) {
return NULL;
}
ASSERT(pkLen == kLen + sizeof(uid));
*uid = *(tb_uid_t *)POINTER_SHIFT(ppKey, kLen);
TDB_FREE(ppKey);
TDB_FREE(pVal);
return metaGetTbInfoByUid(pMeta, *uid);
#endif
return NULL;
}
int metaGetTbNum(SMeta *pMeta) { int metaGetTbNum(SMeta *pMeta) {
// TODO // TODO
// ASSERT(0); // ASSERT(0);

View File

@ -90,10 +90,15 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
if (pHandle->sver != sversion) { if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
tb_uid_t quid; tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); SMetaReader mr = {0};
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctb.suid; metaReaderInit(&mr, pHandle->pVnodeMeta->pVnode, 0);
metaGetTableEntryByUid(&mr, pHandle->pBlock->uid);
if (mr.me.type == META_CHILD_TABLE) {
quid = mr.me.ctbEntry.suid;
} else { } else {
quid = pHandle->pBlock->uid; quid = pHandle->pBlock->uid;
} }

View File

@ -98,24 +98,24 @@ typedef struct SIOCostSummary {
} SIOCostSummary; } SIOCostSummary;
typedef struct STsdbReadHandle { typedef struct STsdbReadHandle {
STsdb* pTsdb; STsdb* pTsdb;
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
int16_t order; int16_t order;
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
int32_t numOfBlocks; int32_t numOfBlocks;
SArray* pColumns; // column list, SColumnInfoData array list SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart; bool locateStart;
int32_t outputCapacity; int32_t outputCapacity;
int32_t realNumOfRows; int32_t realNumOfRows;
SArray* pTableCheckInfo; // SArray<STableCheckInfo> SArray* pTableCheckInfo; // SArray<STableCheckInfo>
int32_t activeIndex; int32_t activeIndex;
bool checkFiles; // check file stage bool checkFiles; // check file stage
int8_t cachelastrow; // check if last row cached int8_t cachelastrow; // check if last row cached
bool loadExternalRow; // load time window external data rows bool loadExternalRow; // load time window external data rows
bool currentLoadExternalRows; // current load external rows bool currentLoadExternalRows; // current load external rows
int32_t loadType; // block load type int32_t loadType; // block load type
char* idStr; // query info handle, for debug purpose char* idStr; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SDFileSet* pFileGroup; SDFileSet* pFileGroup;
SFSIter fileIter; SFSIter fileIter;
@ -1443,7 +1443,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
j++; j++;
i++; i++;
} else { // pColInfo->info.colId < src->colId, it is a NULL data } else { // pColInfo->info.colId < src->colId, it is a NULL data
int32_t rowIndex = numOfRows; int32_t rowIndex = numOfRows;
for (int32_t k = start; k < num + start; ++k, ++rowIndex) { // TODO opt performance for (int32_t k = start; k < num + start; ++k, ++rowIndex) { // TODO opt performance
colDataAppend(pColInfo, rowIndex, NULL, true); colDataAppend(pColInfo, rowIndex, NULL, true);
@ -1454,10 +1454,11 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
while (i < requiredNumOfCols) { // the remain columns are all null data while (i < requiredNumOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
int32_t rowIndex = numOfRows; int32_t rowIndex = numOfRows;
for (int32_t k = start; k < num + start; ++k, ++rowIndex) { for (int32_t k = start; k < num + start; ++k, ++rowIndex) {
colDataAppend(pColInfo, rowIndex, NULL, true); // TODO add a fast version to set a number of consecutive NULL value. colDataAppend(pColInfo, rowIndex, NULL,
true); // TODO add a fast version to set a number of consecutive NULL value.
} }
i++; i++;
} }
@ -1777,7 +1778,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
STable* pTable = NULL; STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 " rows:%d, start:%d, end:%d, %s", tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
" rows:%d, start:%d, end:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
cur->pos, endPos, pTsdbReadHandle->idStr); cur->pos, endPos, pTsdbReadHandle->idStr);
@ -3626,20 +3628,25 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) { SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid); SMetaReader mr = {0};
if (pTbCfg == NULL) {
metaReaderInit(&mr, ((SMeta*)pMeta)->pVnode, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error; goto _error;
} }
if (pTbCfg->type != META_SUPER_TABLE) { if (mr.me.type != META_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
reqId); reqId);
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
goto _error; goto _error;
} }
metaReaderClear(&mr);
// NOTE: not add ref count for super table // NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true); SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
@ -3690,12 +3697,18 @@ int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid); SMeta* metaP = (SMeta*)pMeta;
if (pTbCfg == NULL) { SMetaReader mr = {0};
metaReaderInit(&mr, metaP->pVnode, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error; goto _error;
} }
metaReaderClear(&mr);
pGroupInfo->numOfTables = 1; pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
@ -3708,6 +3721,7 @@ int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
metaReaderClear(&mr);
return terrno; return terrno;
} }