Merge remote-tracking branch 'origin/feat/tsdb_refact' into feat/tsdb_refact_wxy
This commit is contained in:
commit
3e80533bbe
|
@ -205,6 +205,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "commit vnode", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||
|
|
|
@ -250,8 +250,15 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
|
|||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(SLRUCache *pCache);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
||||
|
||||
// bug api, deprecated, USE H version
|
||||
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow);
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid);
|
||||
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
|
||||
// structs =======================
|
||||
typedef struct {
|
||||
|
|
|
@ -51,6 +51,44 @@ static void getTableCacheKey(tb_uid_t uid, const char *cacheType, char *key, int
|
|||
|
||||
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); }
|
||||
|
||||
static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
getTableCacheKey(uid, "lr", key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
if (pRow->ts <= eKey) {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
}
|
||||
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
getTableCacheKey(uid, "l", key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
// clear last cache anyway, no matter where eKey ends.
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
||||
int32_t code = 0;
|
||||
STSRow *cacheRow = NULL;
|
||||
|
@ -65,7 +103,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
|||
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
|
||||
tdRowCpy(cacheRow, row);
|
||||
} else {
|
||||
tsdbCacheDeleteLastrow(pCache, uid);
|
||||
tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX);
|
||||
tsdbCacheInsertLastrow(pCache, uid, row);
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +135,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
|||
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
|
||||
tdRowCpy(cacheRow, row);
|
||||
} else {
|
||||
tsdbCacheDeleteLastrow(pCache, uid);
|
||||
tsdbCacheDeleteLast(pCache, uid, TSKEY_MAX);
|
||||
tsdbCacheInsertLastrow(pCache, uid, row);
|
||||
}
|
||||
}
|
||||
|
@ -581,6 +619,10 @@ typedef struct TsdbNextRowState {
|
|||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
|
||||
|
||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
STbData *pMem = NULL;
|
||||
|
@ -597,6 +639,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
|
||||
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||
|
||||
SDelIdx delIdx;
|
||||
|
||||
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
||||
if (pDelFile) {
|
||||
SDelFReader *pDelFReader;
|
||||
|
@ -604,7 +648,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
SDelIdx delIdx;
|
||||
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
|
@ -612,6 +655,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
if (code) goto _err;
|
||||
|
||||
tsdbDelFReaderClose(pDelFReader);
|
||||
} else {
|
||||
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
|
@ -644,7 +690,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
input[1].next = true;
|
||||
}
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
int16_t nilColCount = nCol - 1; // count of null & none cols
|
||||
int iCol = 0; // index of first nil col index from left to right
|
||||
bool setICol = false;
|
||||
|
||||
do {
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
|
@ -667,15 +715,17 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
TSDBROW *max[3] = {0};
|
||||
int iMax[3] = {-1, -1, -1};
|
||||
int nMax = 0;
|
||||
TSKEY maxKey = TSKEY_MIN;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (input[i].pRow != NULL) {
|
||||
if (!input[i].stop && input[i].pRow != NULL) {
|
||||
TSDBKEY key = TSDBROW_KEY(input[i].pRow);
|
||||
TSDBKEY maxKey = TSDBROW_KEY(max[nMax]);
|
||||
|
||||
// merging & deduplicating on client side
|
||||
if (maxKey.ts <= key.ts) {
|
||||
if (maxKey.ts < key.ts) {
|
||||
if (maxKey <= key.ts) {
|
||||
if (maxKey < key.ts) {
|
||||
nMax = 0;
|
||||
maxKey = key.ts;
|
||||
}
|
||||
|
||||
iMax[nMax] = i;
|
||||
|
@ -686,6 +736,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
|
||||
// delete detection
|
||||
TSDBROW *merge[3] = {0};
|
||||
int iMerge[3] = {-1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||
|
@ -693,6 +744,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
// bool deleted = false;
|
||||
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
||||
if (!deleted) {
|
||||
iMerge[nMerge] = i;
|
||||
merge[nMerge++] = max[i];
|
||||
}
|
||||
|
||||
|
@ -716,6 +768,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
tRowMergerClear(&merger);
|
||||
}
|
||||
}
|
||||
|
||||
} while (*ppRow == NULL);
|
||||
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
@ -727,6 +780,245 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
|
||||
|
||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
STbData *pMem = NULL;
|
||||
if (pTsdb->mem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
|
||||
}
|
||||
|
||||
STbData *pIMem = NULL;
|
||||
if (pTsdb->imem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
|
||||
}
|
||||
|
||||
*ppRow = NULL;
|
||||
|
||||
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||
|
||||
SDelIdx delIdx;
|
||||
|
||||
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
||||
if (pDelFile) {
|
||||
SDelFReader *pDelFReader;
|
||||
|
||||
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
|
||||
if (code) goto _err;
|
||||
|
||||
tsdbDelFReaderClose(pDelFReader);
|
||||
} else {
|
||||
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
|
||||
SBlockIdx idx = {.suid = suid, .uid = uid};
|
||||
|
||||
SFSNextRowIter fsState = {0};
|
||||
fsState.state = SFSNEXTROW_FS;
|
||||
fsState.pTsdb = pTsdb;
|
||||
fsState.pBlockIdxExp = &idx;
|
||||
|
||||
SMemNextRowIter memState = {0};
|
||||
SMemNextRowIter imemState = {0};
|
||||
TSDBROW memRow, imemRow, fsRow;
|
||||
|
||||
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem},
|
||||
{&imemRow, true, false, &imemState, getNextRowFromMem},
|
||||
{&fsRow, false, true, &fsState, getNextRowFromFS}};
|
||||
|
||||
if (pMem) {
|
||||
memState.pMem = pMem;
|
||||
memState.state = SMEMNEXTROW_ENTER;
|
||||
input[0].stop = false;
|
||||
input[0].next = true;
|
||||
}
|
||||
if (pIMem) {
|
||||
imemState.pMem = pIMem;
|
||||
imemState.state = SMEMNEXTROW_ENTER;
|
||||
input[1].stop = false;
|
||||
input[1].next = true;
|
||||
}
|
||||
|
||||
int16_t nilColCount = nCol - 1; // count of null & none cols
|
||||
int iCol = 0; // index of first nil col index from left to right
|
||||
bool setICol = false;
|
||||
|
||||
do {
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (input[i].next && !input[i].stop) {
|
||||
code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
if (input[i].pRow == NULL) {
|
||||
input[i].stop = true;
|
||||
input[i].next = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (input[0].stop && input[1].stop && input[2].stop) {
|
||||
break;
|
||||
}
|
||||
|
||||
// select maxpoint(s) from mem, imem, fs
|
||||
TSDBROW *max[3] = {0};
|
||||
int iMax[3] = {-1, -1, -1};
|
||||
int nMax = 0;
|
||||
TSKEY maxKey = TSKEY_MIN;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (!input[i].stop && input[i].pRow != NULL) {
|
||||
TSDBKEY key = TSDBROW_KEY(input[i].pRow);
|
||||
|
||||
// merging & deduplicating on client side
|
||||
if (maxKey <= key.ts) {
|
||||
if (maxKey < key.ts) {
|
||||
nMax = 0;
|
||||
maxKey = key.ts;
|
||||
}
|
||||
|
||||
iMax[nMax] = i;
|
||||
max[nMax++] = input[i].pRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete detection
|
||||
TSDBROW *merge[3] = {0};
|
||||
int iMerge[3] = {-1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||
|
||||
// bool deleted = false;
|
||||
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
||||
if (!deleted) {
|
||||
iMerge[nMerge] = i;
|
||||
merge[nMerge++] = max[i];
|
||||
}
|
||||
|
||||
input[iMax[i]].next = deleted;
|
||||
}
|
||||
|
||||
// merge if nMerge > 1
|
||||
if (nMerge > 0) {
|
||||
if (nMerge == 1) {
|
||||
code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
// merge 2 or 3 rows
|
||||
SRowMerger merger = {0};
|
||||
|
||||
tRowMergerInit(&merger, merge[0], pTSchema);
|
||||
for (int i = 1; i < nMerge; ++i) {
|
||||
tRowMerge(&merger, merge[i]);
|
||||
}
|
||||
tRowMergerGetRow(&merger, ppRow);
|
||||
tRowMergerClear(&merger);
|
||||
}
|
||||
}
|
||||
|
||||
if (iCol == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey});
|
||||
|
||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
++iCol;
|
||||
|
||||
setICol = false;
|
||||
for (int16_t i = iCol; iCol < nCol; ++i) {
|
||||
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
|
||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pColVal->isNull || pColVal->isNone) {
|
||||
for (int j = 0; j < nMerge; ++j) {
|
||||
SColVal jColVal = {0};
|
||||
tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
|
||||
if (jColVal.isNull || jColVal.isNone) {
|
||||
input[iMerge[j]].next = true;
|
||||
}
|
||||
}
|
||||
if (!setICol) {
|
||||
iCol = i;
|
||||
setICol = true;
|
||||
}
|
||||
} else {
|
||||
--nilColCount;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
setICol = false;
|
||||
for (int16_t i = iCol; i < nCol; ++i) {
|
||||
SColVal colVal = {0};
|
||||
tTSRowGetVal(*ppRow, pTSchema, i, &colVal);
|
||||
|
||||
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i);
|
||||
|
||||
if (!colVal.isNone && !colVal.isNull) {
|
||||
if (tColVal->isNull || tColVal->isNone) {
|
||||
taosArraySet(pColArray, i, &colVal);
|
||||
--nilColCount;
|
||||
}
|
||||
} else {
|
||||
if (tColVal->isNull || tColVal->isNone && !setICol) {
|
||||
iCol = i;
|
||||
setICol = true;
|
||||
|
||||
for (int j = 0; j < nMerge; ++j) {
|
||||
SColVal jColVal = {0};
|
||||
tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
|
||||
if (jColVal.isNull || jColVal.isNone) {
|
||||
input[iMerge[j]].next = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (nilColCount > 0);
|
||||
|
||||
// if () new ts row from pColArray if non empty
|
||||
if (taosArrayGetSize(pColArray) == nCol) {
|
||||
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
|
||||
if (code) goto _err;
|
||||
}
|
||||
taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
||||
return code;
|
||||
_err:
|
||||
taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
|
@ -749,9 +1041,11 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo
|
|||
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
}
|
||||
|
||||
// taosLRUCacheRelease(pCache, h, true);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
|
@ -763,7 +1057,7 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow *
|
|||
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
} else {
|
||||
STSRow *pRow = NULL;
|
||||
// code = mergeLast(uid, pTsdb, &pRow);
|
||||
code = mergeLast(uid, pTsdb, &pRow);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pRow == NULL) {
|
||||
return -1;
|
||||
|
@ -774,10 +1068,12 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow *
|
|||
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
}
|
||||
|
||||
// taosLRUCacheRelease(pCache, h, true);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
|
||||
#endif
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
@ -785,10 +1081,91 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
|
|||
getTableCacheKey(uid, "lr", key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
} else {
|
||||
STSRow *pRow = NULL;
|
||||
code = mergeLastRow(uid, pTsdb, &pRow);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pRow == NULL) {
|
||||
*handle = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsdbCacheInsertLastrow(pCache, uid, pRow);
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
}
|
||||
|
||||
*handle = h;
|
||||
// taosLRUCacheRelease(pCache, h, true);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
getTableCacheKey(uid, "l", key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
|
||||
} else {
|
||||
STSRow *pRow = NULL;
|
||||
code = mergeLast(uid, pTsdb, &pRow);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pRow == NULL) {
|
||||
*handle = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsdbCacheInsertLast(pCache, uid, pRow);
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
}
|
||||
|
||||
*handle = h;
|
||||
// taosLRUCacheRelease(pCache, h, true);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
getTableCacheKey(uid, "lr", key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
if (pRow->ts <= eKey) {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
}
|
||||
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
getTableCacheKey(uid, "l", key, &keyLen);
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
// clear last cache anyway, no matter where eKey ends.
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t
|
||||
// keyLen);
|
||||
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -22,15 +22,15 @@ typedef struct SLastrowReader {
|
|||
SVnode* pVnode;
|
||||
STSchema* pSchema;
|
||||
uint64_t uid;
|
||||
// int32_t* pSlotIds;
|
||||
char** transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
int32_t type;
|
||||
int32_t tableIndex; // currently returned result tables
|
||||
SArray* pTableList; // table id list
|
||||
// int32_t* pSlotIds;
|
||||
char** transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
int32_t type;
|
||||
int32_t tableIndex; // currently returned result tables
|
||||
SArray* pTableList; // table id list
|
||||
} SLastrowReader;
|
||||
|
||||
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t *slotIds) {
|
||||
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
||||
|
@ -60,21 +60,21 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
|
|||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
|
||||
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader) {
|
||||
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols,
|
||||
void** pReader) {
|
||||
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
p->type = type;
|
||||
p->pVnode = pVnode;
|
||||
p->numOfCols = numOfCols;
|
||||
p->type = type;
|
||||
p->pVnode = pVnode;
|
||||
p->numOfCols = numOfCols;
|
||||
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
|
||||
|
||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
|
||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
|
||||
p->pTableList = pTableIdList;
|
||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
|
||||
p->pTableList = pTableIdList;
|
||||
#if 0
|
||||
for(int32_t i = 0; i < p->numOfCols; ++i) {
|
||||
for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) {
|
||||
|
@ -101,7 +101,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
|
|||
int32_t tsdbLastrowReaderClose(void* pReader) {
|
||||
SLastrowReader* p = pReader;
|
||||
|
||||
for(int32_t i = 0; i < p->numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < p->numOfCols; ++i) {
|
||||
taosMemoryFreeClear(p->transferBuf[i]);
|
||||
}
|
||||
|
||||
|
@ -117,8 +117,9 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
|
|||
|
||||
SLastrowReader* pr = pReader;
|
||||
|
||||
STSRow* pRow = NULL;
|
||||
size_t numOfTables = taosArrayGetSize(pr->pTableList);
|
||||
LRUHandle* h = NULL;
|
||||
STSRow* pRow = NULL;
|
||||
size_t numOfTables = taosArrayGetSize(pr->pTableList);
|
||||
|
||||
// retrieve the only one last row of all tables in the uid list.
|
||||
if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) {
|
||||
|
@ -126,16 +127,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
|
|||
bool internalResult = false;
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
|
||||
|
||||
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow);
|
||||
|
||||
/* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */
|
||||
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pRow == NULL) {
|
||||
if (h == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h);
|
||||
if (pRow->ts > lastKey) {
|
||||
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
|
||||
// appended or not.
|
||||
|
@ -147,23 +150,29 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
|
|||
internalResult = true;
|
||||
lastKey = pRow->ts;
|
||||
}
|
||||
|
||||
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h);
|
||||
}
|
||||
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
|
||||
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
|
||||
|
||||
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow);
|
||||
/* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */
|
||||
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// no data in the table of Uid
|
||||
if (pRow == NULL) {
|
||||
if (h == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h);
|
||||
saveOneRow(pRow, pResBlock, pr, slotIds);
|
||||
|
||||
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h);
|
||||
|
||||
pr->tableIndex += 1;
|
||||
if (pResBlock->info.rows >= pResBlock->info.capacity) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
pMemTable->nDel++;
|
||||
|
||||
if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
|
||||
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid);
|
||||
tsdbCacheDelete(pTsdb->lruCache, pTbData->uid, eKey);
|
||||
}
|
||||
|
||||
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||
|
|
|
@ -984,8 +984,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
}
|
||||
} else {
|
||||
SColVal cv = {0};
|
||||
|
||||
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
|
||||
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1);
|
||||
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
|
||||
tColDataGetValue(pData, j, &cv);
|
||||
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
|
||||
|
@ -994,7 +993,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
}
|
||||
|
||||
pReader->pResBlock->info.rows = pBlockData->nRow;
|
||||
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
|
||||
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
|
||||
|
||||
/*
|
||||
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
|
||||
|
@ -2283,7 +2282,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
|
|||
}
|
||||
|
||||
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
|
||||
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/;
|
||||
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer);
|
||||
}
|
||||
|
||||
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
|
||||
|
@ -2616,7 +2615,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) {
|
||||
static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
while(1) {
|
||||
|
@ -2644,7 +2643,7 @@ static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t loadDataInFiles(STsdbReader* pReader) {
|
||||
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SFileSetIter* pFIter = &pStatus->fileIter;
|
||||
|
||||
|
@ -3018,7 +3017,7 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
|||
// // check if the query range overlaps with the file data block
|
||||
// bool exists = true;
|
||||
|
||||
// int32_t code = loadDataInFiles(pTsdbReadHandle, &exists);
|
||||
// int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// pTsdbReadHandle->checkFiles = false;
|
||||
// return false;
|
||||
|
@ -3306,7 +3305,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
|
||||
if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
|
||||
if (pStatus->loadFromFile) {
|
||||
int32_t code = loadDataInFiles(pReader);
|
||||
int32_t code = buildBlockFromFiles(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
@ -3314,11 +3313,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
if (pBlock->info.rows > 0) {
|
||||
return true;
|
||||
} else {
|
||||
buildInmemBlockSeqentially(pReader);
|
||||
buildBlockFromBufferSeqentially(pReader);
|
||||
return pBlock->info.rows > 0;
|
||||
}
|
||||
} else { // no data in files, let's try the buffer
|
||||
buildInmemBlockSeqentially(pReader);
|
||||
buildBlockFromBufferSeqentially(pReader);
|
||||
return pBlock->info.rows > 0;
|
||||
}
|
||||
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
|
||||
|
@ -3334,7 +3333,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
// if (pReader->checkFiles) {
|
||||
// // check if the query range overlaps with the file data block
|
||||
// bool exists = true;
|
||||
// int32_t code = loadDataInFiles(pReader, &exists);
|
||||
// int32_t code = buildBlockFromFiles(pReader, &exists);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// pReader->activeIndex = 0;
|
||||
// pReader->checkFiles = false;
|
||||
|
@ -3454,7 +3453,16 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
|||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData);
|
||||
doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pReader->pResBlock->pDataBlock;
|
||||
}
|
||||
|
|
|
@ -738,98 +738,135 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
|
||||
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
|
||||
uint8_t **ppBuf2) {
|
||||
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
|
||||
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
|
||||
int32_t code = 0;
|
||||
int64_t offset;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
|
||||
tBlockDataReset(pBlockData);
|
||||
pBlockData->nRow = pSubBlock->nRow;
|
||||
|
||||
// TSDBKEY
|
||||
offset = pSubBlock->offset + sizeof(SBlockDataHdr);
|
||||
size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
|
||||
code = tsdbRealloc(ppBuf1, size);
|
||||
if (code) goto _err;
|
||||
|
||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosReadFile(pFD, *ppBuf1, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
|
||||
if (code) goto _err;
|
||||
|
||||
// OTHER
|
||||
SBlockCol blockCol;
|
||||
SBlockCol *pBlockCol = &blockCol;
|
||||
SColData *pColData;
|
||||
for (int32_t iCol = 0; iCol < nCol; iCol++) {
|
||||
int16_t cid = aColId[iCol];
|
||||
|
||||
if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) ==
|
||||
0) {
|
||||
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
|
||||
if (code) goto _err;
|
||||
|
||||
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
|
||||
if (pBlockCol->flag == HAS_NULL) {
|
||||
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
|
||||
if (code) goto _err;
|
||||
}
|
||||
} else {
|
||||
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset;
|
||||
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
|
||||
|
||||
code = tsdbRealloc(ppBuf1, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// seek
|
||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// read
|
||||
n = taosReadFile(pFD, *ppBuf1, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
|
||||
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) {
|
||||
int32_t code = 0;
|
||||
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
|
||||
uint8_t *pBuf1 = NULL;
|
||||
uint8_t *pBuf2 = NULL;
|
||||
int32_t code = 0;
|
||||
uint8_t *pBuf1 = NULL;
|
||||
uint8_t *pBuf2 = NULL;
|
||||
|
||||
ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
|
||||
if (!ppBuf1) ppBuf1 = &pBuf1;
|
||||
if (!ppBuf2) ppBuf2 = &pBuf2;
|
||||
|
||||
for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
|
||||
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
|
||||
int64_t offset;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2);
|
||||
if (code) goto _err;
|
||||
|
||||
tBlockDataReset(pBlockData);
|
||||
pBlockData->nRow = pSubBlock->nRow;
|
||||
if (pBlock->nSubBlock > 1) {
|
||||
SBlockData *pBlockData1 = &(SBlockData){0};
|
||||
SBlockData *pBlockData2 = &(SBlockData){0};
|
||||
|
||||
// TSDBKEY
|
||||
offset = pSubBlock->offset + sizeof(SBlockDataHdr);
|
||||
size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
|
||||
code = tsdbRealloc(ppBuf1, size);
|
||||
if (code) goto _err;
|
||||
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
|
||||
code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2);
|
||||
if (code) goto _err;
|
||||
|
||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
code = tBlockDataCopy(pBlockData, pBlockData2);
|
||||
if (code) {
|
||||
tBlockDataClear(pBlockData1);
|
||||
tBlockDataClear(pBlockData2);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosReadFile(pFD, *ppBuf1, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
|
||||
if (code) goto _err;
|
||||
|
||||
// OTHER
|
||||
SBlockCol blockCol;
|
||||
SBlockCol *pBlockCol = &blockCol;
|
||||
SColData *pColData;
|
||||
for (int32_t iCol = 0; iCol < nCol; iCol++) {
|
||||
int16_t cid = aColId[iCol];
|
||||
|
||||
if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) ==
|
||||
0) {
|
||||
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
|
||||
if (code) goto _err;
|
||||
|
||||
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
|
||||
if (pBlockCol->flag == HAS_NULL) {
|
||||
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
|
||||
if (code) goto _err;
|
||||
}
|
||||
} else {
|
||||
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset;
|
||||
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
|
||||
|
||||
code = tsdbRealloc(ppBuf1, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// seek
|
||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// read
|
||||
n = taosReadFile(pFD, *ppBuf1, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
|
||||
if (code) goto _err;
|
||||
}
|
||||
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
|
||||
if (code) {
|
||||
tBlockDataClear(pBlockData1);
|
||||
tBlockDataClear(pBlockData2);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
tBlockDataClear(pBlockData1);
|
||||
tBlockDataClear(pBlockData2);
|
||||
}
|
||||
|
||||
tsdbFree(pBuf1);
|
||||
|
@ -876,39 +913,11 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// // check
|
||||
// p = *ppBuf1;
|
||||
// SBlockDataHdr *pHdr = (SBlockDataHdr *)p;
|
||||
// ASSERT(pHdr->delimiter == TSDB_FILE_DLMT);
|
||||
// ASSERT(pHdr->suid == pBlockIdx->suid);
|
||||
// ASSERT(pHdr->uid == pBlockIdx->uid);
|
||||
// p += sizeof(*pHdr);
|
||||
|
||||
// if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) {
|
||||
// code = TSDB_CODE_FILE_CORRUPTED;
|
||||
// goto _err;
|
||||
// }
|
||||
// p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM));
|
||||
|
||||
// for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
|
||||
// tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
|
||||
|
||||
// ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
|
||||
|
||||
// if (pBlockCol->flag == HAS_NULL) continue;
|
||||
|
||||
// if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) {
|
||||
// code = TSDB_CODE_FILE_CORRUPTED;
|
||||
// goto _err;
|
||||
// }
|
||||
// p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
|
||||
// }
|
||||
|
||||
// recover
|
||||
pBlockData->nRow = pSubBlock->nRow;
|
||||
p = *ppBuf1 + sizeof(SBlockDataHdr);
|
||||
|
||||
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
|
||||
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2);
|
||||
if (code) goto _err;
|
||||
p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
|
||||
|
||||
|
@ -964,7 +973,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
|
|||
SBlockData *pBlockData2 = &(SBlockData){0};
|
||||
|
||||
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
|
||||
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2);
|
||||
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
|
||||
if (code) {
|
||||
tBlockDataClear(pBlockData1);
|
||||
tBlockDataClear(pBlockData2);
|
||||
|
|
|
@ -1031,20 +1031,24 @@ static int32_t tColDataUpdateOffset(SColData *pColData) {
|
|||
|
||||
ASSERT(pColData->nVal > 0);
|
||||
ASSERT(pColData->flag);
|
||||
ASSERT(IS_VAR_DATA_TYPE(pColData->type));
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->type) && (pColData->flag & HAS_VALUE)) {
|
||||
if ((pColData->flag & HAS_VALUE)) {
|
||||
code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * pColData->nVal);
|
||||
if (code) goto _exit;
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
|
||||
uint8_t v = GET_BIT2(pColData->pBitMap, iVal);
|
||||
if (v == 0 || v == 1) {
|
||||
pColData->aOffset[iVal] = -1;
|
||||
} else {
|
||||
pColData->aOffset[iVal] = offset;
|
||||
offset += tGetValue(pColData->pData + offset, &value, pColData->type);
|
||||
if (pColData->flag != HAS_VALUE) {
|
||||
uint8_t v = GET_BIT2(pColData->pBitMap, iVal);
|
||||
if (v == 0 || v == 1) {
|
||||
pColData->aOffset[iVal] = -1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
pColData->aOffset[iVal] = offset;
|
||||
offset += tGetValue(pColData->pData + offset, &value, pColData->type);
|
||||
}
|
||||
|
||||
ASSERT(offset == pColData->nData);
|
||||
|
|
|
@ -218,6 +218,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
break;
|
||||
case TDMT_VND_ALTER_CONFIG:
|
||||
break;
|
||||
case TDMT_VND_COMMIT:
|
||||
goto _do_commit;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
|
@ -232,6 +234,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
|
||||
// commit if need
|
||||
if (vnodeShouldCommit(pVnode)) {
|
||||
_do_commit:
|
||||
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
|
||||
// commit current change
|
||||
vnodeCommit(pVnode);
|
||||
|
|
|
@ -107,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
|||
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
|
||||
|
||||
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
|
||||
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
|
||||
int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
|
||||
SArray* createSortInfo(SNodeList* pNodeList);
|
||||
SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
|
||||
|
|
|
@ -283,7 +283,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
|
|||
return result;
|
||||
}
|
||||
|
||||
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
|
||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -303,7 +303,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
|||
//code = doFilterTag(pTagIndexCond, &metaArg, res);
|
||||
code = TSDB_CODE_INDEX_REBUILDING;
|
||||
if (code == TSDB_CODE_INDEX_REBUILDING) {
|
||||
code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
||||
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
|
||||
} else if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
|
||||
taosArrayDestroy(res);
|
||||
|
@ -319,7 +319,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
|||
}
|
||||
taosArrayDestroy(res);
|
||||
} else {
|
||||
code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
||||
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
|
||||
}
|
||||
|
||||
if(pTagCond){
|
||||
|
|
|
@ -4121,7 +4121,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
|
@ -4133,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||
|
||||
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
|
||||
int32_t code = vnodeGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList);
|
||||
int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
|
@ -4183,7 +4183,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
|
||||
code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList);
|
||||
code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
|
@ -4399,7 +4399,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
}
|
||||
|
||||
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) {
|
||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
|
|
@ -2033,7 +2033,7 @@ typedef struct STableMergeScanInfo {
|
|||
|
||||
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -545,6 +545,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// TODO consider the page meta size
|
||||
int32_t getProperSortPageSize(size_t rowSize) {
|
||||
uint32_t defaultPageSize = 4096;
|
||||
|
||||
|
|
Loading…
Reference in New Issue