Merge branch 'refact/tsdb_optimize' of https://github.com/taosdata/TDengine into refact/tsdb_new_format
This commit is contained in:
commit
305b6e70c0
|
@ -258,10 +258,12 @@ static const SSysDbTableSchema subscriptionSchema[] = {
|
|||
};
|
||||
|
||||
static const SSysDbTableSchema vnodesSchema[] = {
|
||||
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "dnode_endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = true},
|
||||
{.name = "replica", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
|
||||
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "dnode_ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
};
|
||||
|
||||
static const SSysTableMeta infosMeta[] = {
|
||||
|
|
|
@ -794,32 +794,43 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
if (pShow->pIter == NULL) break;
|
||||
|
||||
for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||
SColumnInfoData *pColInfo = NULL;
|
||||
cols = 0;
|
||||
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
|
||||
|
||||
SName name = {0};
|
||||
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&name, varDataVal(db));
|
||||
varDataSetLen(db, strlen(varDataVal(db)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)db, false);
|
||||
|
||||
uint32_t val = 0;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&val, false);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->replica, false);
|
||||
|
||||
char buf[20] = {0};
|
||||
STR_TO_VARSTR(buf, syncStr(pVgid->role));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)buf, false);
|
||||
|
||||
const char *dbname = mndGetDbStr(pVgroup->dbName);
|
||||
char b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
if (dbname != NULL) {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||
} else {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||
}
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->replica, false); // onlines
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)b1, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgid->dnodeId, false);
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
char b2[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
if (pDnode != NULL) {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
|
||||
} else {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, "NULL", TSDB_EP_LEN + VARSTR_HEADER_SIZE);
|
||||
}
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
|
|
@ -1308,7 +1308,7 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
|
|||
|
||||
SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
|
||||
if (pBDatal->suid || pBDatal->uid) {
|
||||
if (pBDatal->suid != id.suid || id.uid == 0) {
|
||||
if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
|
||||
if (pBDatal->nRow) {
|
||||
code = tsdbCommitLastBlock(pCommitter);
|
||||
if (code) goto _exit;
|
||||
|
|
|
@ -311,17 +311,6 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// sst ===========
|
||||
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[0], fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
if (size != pSet->aSstF[0]->size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// sma =============
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
|
@ -335,6 +324,19 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
|||
code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// sst ===========
|
||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
||||
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
if (size != pSet->aSstF[iSst]->size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -897,21 +899,26 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|||
pSetOld->aSstF[0]->nRef = 1;
|
||||
} else {
|
||||
for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
|
||||
SSstFile *pSstFile = pSetOld->aSstF[iSst];
|
||||
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSstFile);
|
||||
}
|
||||
if (pSetOld->aSstF[iSst]->commitID != pSetNew->aSstF[iSst]->commitID) {
|
||||
SSstFile *pSstFile = pSetOld->aSstF[iSst];
|
||||
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSstFile);
|
||||
}
|
||||
|
||||
pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
||||
if (pSetOld->aSstF[iSst] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
||||
if (pSetOld->aSstF[iSst] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst];
|
||||
pSetOld->aSstF[iSst]->nRef = 1;
|
||||
} else {
|
||||
ASSERT(pSetOld->aSstF[iSst]->size == pSetOld->aSstF[iSst]->size);
|
||||
ASSERT(pSetOld->aSstF[iSst]->offset == pSetOld->aSstF[iSst]->offset);
|
||||
}
|
||||
*pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst];
|
||||
pSetOld->aSstF[iSst]->nRef = 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -2244,9 +2244,12 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
|
|||
}
|
||||
|
||||
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
|
||||
if (pOrderCheckInfo->tableUidList == NULL) {
|
||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||
if (total == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pOrderCheckInfo->tableUidList == NULL) {
|
||||
pOrderCheckInfo->currentIndex = 0;
|
||||
pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
|
||||
if (pOrderCheckInfo->tableUidList == NULL) {
|
||||
|
@ -2254,21 +2257,17 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderSt
|
|||
}
|
||||
|
||||
extractOrderedTableUidList(pOrderCheckInfo, pStatus);
|
||||
|
||||
uint64_t uid = pOrderCheckInfo->tableUidList[0];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
} else {
|
||||
if (pStatus->pTableIter == NULL) { // it is the last block of a new file
|
||||
// ASSERT(pOrderCheckInfo->currentIndex == taosHashGetSize(pStatus->pTableMap));
|
||||
|
||||
pOrderCheckInfo->currentIndex = 0;
|
||||
uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
|
||||
// the tableMap has already updated
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
void* p =
|
||||
taosMemoryRealloc(pOrderCheckInfo->tableUidList, taosHashGetSize(pStatus->pTableMap) * sizeof(uint64_t));
|
||||
void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -87,9 +87,7 @@ struct SqlFunctionCtx;
|
|||
|
||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||
|
||||
void initResultRow(SResultRow* pResultRow);
|
||||
void closeResultRow(SResultRow* pResultRow);
|
||||
void closeResultRow(SResultRow* pResultRow);
|
||||
|
||||
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
|
||||
|
||||
|
|
|
@ -609,6 +609,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
while (1) {
|
||||
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
|
||||
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data;
|
||||
|
||||
uint64_t groupId = pOpenWin->groupId;
|
||||
SResultRowPosition* p1 = &pOpenWin->pos;
|
||||
if (p->pageId == p1->pageId && p->offset == p1->offset) {
|
||||
|
@ -621,7 +622,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
if (pr->closed) {
|
||||
ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
|
||||
isResultRowInterpolated(pr, RESULT_ROW_END_INTERP));
|
||||
tdListPopHead(pResultRowInfo->openWindow);
|
||||
SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
|
||||
taosMemoryFree(pNode);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -650,7 +652,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
|
||||
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
|
||||
closeResultRow(pr);
|
||||
tdListPopHead(pResultRowInfo->openWindow);
|
||||
SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
|
||||
taosMemoryFree(pNode);
|
||||
} else { // the remains are can not be closed yet.
|
||||
break;
|
||||
}
|
||||
|
@ -1730,6 +1733,10 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
cleanupExprSupp(&pInfo->scalarSupp);
|
||||
|
||||
tdListFree(pInfo->binfo.resultRowInfo.openWindow);
|
||||
|
||||
pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages);
|
||||
pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
|
||||
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
|
||||
|
|
|
@ -107,6 +107,37 @@ if $data30 != 12 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print =============== show vnodes
|
||||
sql show vnodes 1
|
||||
if $rows != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[2] != leader then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[3] != d2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[4] != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[5] != localhost:7100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#sql show vnodes 'localhost:7100'
|
||||
#if $rows != 9 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
print =============== drop database
|
||||
sql drop database d2
|
||||
sql drop database d3
|
||||
|
|
|
@ -128,6 +128,7 @@ if $rows != 5 then
|
|||
return -1
|
||||
endi
|
||||
if $data00 != $rowNum then
|
||||
print expect $rowNum , actual: $data00
|
||||
return -1
|
||||
endi
|
||||
if $data10 != $rowNum then
|
||||
|
|
Loading…
Reference in New Issue