diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9d7cfc0552..eac4f06132 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -251,7 +251,8 @@ int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid); +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); // structs ======================= typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2f6901cb24..12f323e92e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -65,7 +65,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -97,7 +97,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -581,6 +581,10 @@ typedef struct TsdbNextRowState { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); STbData *pMem = NULL; @@ -597,6 +601,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + SDelIdx delIdx; + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -604,7 +610,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); if (code) goto _err; - SDelIdx delIdx; code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); if (code) goto _err; @@ -612,6 +617,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { if (code) goto _err; tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; } int iSkyline = taosArrayGetSize(pSkyline) - 1; @@ -644,7 +652,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { input[1].next = true; } - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; do { for (int i = 0; i < 3; ++i) { @@ -667,15 +677,17 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { TSDBROW *max[3] = {0}; int iMax[3] = {-1, -1, -1}; int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + for (int i = 0; i < 3; ++i) { - if (input[i].pRow != NULL) { + if (!input[i].stop && input[i].pRow != NULL) { TSDBKEY key = TSDBROW_KEY(input[i].pRow); - TSDBKEY maxKey = TSDBROW_KEY(max[nMax]); // merging & deduplicating on client side - if (maxKey.ts <= key.ts) { - if (maxKey.ts < key.ts) { + if (maxKey <= key.ts) { + if (maxKey < key.ts) { nMax = 0; + maxKey = key.ts; } iMax[nMax] = i; @@ -686,6 +698,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // delete detection TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; int nMerge = 0; for (int i = 0; i < nMax; ++i) { TSDBKEY maxKey = TSDBROW_KEY(max[i]); @@ -693,6 +706,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // bool deleted = false; bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); if (!deleted) { + iMerge[nMerge] = i; merge[nMerge++] = max[i]; } @@ -716,6 +730,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { tRowMergerClear(&merger); } } + } while (*ppRow == NULL); taosMemoryFreeClear(pTSchema); @@ -727,6 +742,245 @@ _err: return code; } +static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { + int32_t code = 0; + + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); + + STbData *pMem = NULL; + if (pTsdb->mem) { + tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); + } + + STbData *pIMem = NULL; + if (pTsdb->imem) { + tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); + } + + *ppRow = NULL; + + SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + + SDelIdx delIdx; + + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + if (pDelFile) { + SDelFReader *pDelFReader; + + code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); + if (code) goto _err; + + code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); + if (code) goto _err; + + code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); + if (code) goto _err; + + tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; + } + + int iSkyline = taosArrayGetSize(pSkyline) - 1; + + SBlockIdx idx = {.suid = suid, .uid = uid}; + + SFSNextRowIter fsState = {0}; + fsState.state = SFSNEXTROW_FS; + fsState.pTsdb = pTsdb; + fsState.pBlockIdxExp = &idx; + + SMemNextRowIter memState = {0}; + SMemNextRowIter imemState = {0}; + TSDBROW memRow, imemRow, fsRow; + + TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, + {&imemRow, true, false, &imemState, getNextRowFromMem}, + {&fsRow, false, true, &fsState, getNextRowFromFS}}; + + if (pMem) { + memState.pMem = pMem; + memState.state = SMEMNEXTROW_ENTER; + input[0].stop = false; + input[0].next = true; + } + if (pIMem) { + imemState.pMem = pIMem; + imemState.state = SMEMNEXTROW_ENTER; + input[1].stop = false; + input[1].next = true; + } + + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; + + do { + for (int i = 0; i < 3; ++i) { + if (input[i].next && !input[i].stop) { + code = input[i].nextRowFn(input[i].iter, &input[i].pRow); + if (code) goto _err; + + if (input[i].pRow == NULL) { + input[i].stop = true; + input[i].next = false; + } + } + } + + if (input[0].stop && input[1].stop && input[2].stop) { + break; + } + + // select maxpoint(s) from mem, imem, fs + TSDBROW *max[3] = {0}; + int iMax[3] = {-1, -1, -1}; + int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + + for (int i = 0; i < 3; ++i) { + if (!input[i].stop && input[i].pRow != NULL) { + TSDBKEY key = TSDBROW_KEY(input[i].pRow); + + // merging & deduplicating on client side + if (maxKey <= key.ts) { + if (maxKey < key.ts) { + nMax = 0; + maxKey = key.ts; + } + + iMax[nMax] = i; + max[nMax++] = input[i].pRow; + } + } + } + + // delete detection + TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey = TSDBROW_KEY(max[i]); + + // bool deleted = false; + bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); + if (!deleted) { + iMerge[nMerge] = i; + merge[nMerge++] = max[i]; + } + + input[iMax[i]].next = deleted; + } + + // merge if nMerge > 1 + if (nMerge > 0) { + if (nMerge == 1) { + code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow); + if (code) goto _err; + } else { + // merge 2 or 3 rows + SRowMerger merger = {0}; + + tRowMergerInit(&merger, merge[0], pTSchema); + for (int i = 1; i < nMerge; ++i) { + tRowMerge(&merger, merge[i]); + } + tRowMergerGetRow(&merger, ppRow); + tRowMergerClear(&merger); + } + } + + if (iCol == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + SColVal *pColVal = &(SColVal){0}; + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey}); + + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + ++iCol; + + setICol = false; + for (int16_t i = iCol; iCol < nCol; ++i) { + // tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal); + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (pColVal->isNull || pColVal->isNone) { + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + if (!setICol) { + iCol = i; + setICol = true; + } + } else { + --nilColCount; + } + } + + continue; + } + + setICol = false; + for (int16_t i = iCol; i < nCol; ++i) { + SColVal colVal = {0}; + tTSRowGetVal(*ppRow, pTSchema, i, &colVal); + + SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i); + + if (!colVal.isNone && !colVal.isNull) { + if (tColVal->isNull || tColVal->isNone) { + taosArraySet(pColArray, i, &colVal); + --nilColCount; + } + } else { + if (tColVal->isNull || tColVal->isNone && !setICol) { + iCol = i; + setICol = true; + + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + } + } + } + } while (nilColCount > 0); + + // if () new ts row from pColArray if non empty + if (taosArrayGetSize(pColArray) == nCol) { + code = tdSTSRowNew(pColArray, pTSchema, ppRow); + if (code) goto _err; + } + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + + return code; +_err: + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; @@ -749,6 +1003,8 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + return code; } @@ -763,7 +1019,7 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } else { STSRow *pRow = NULL; - // code = mergeLast(uid, pTsdb, &pRow); + code = mergeLast(uid, pTsdb, &pRow); // if table's empty or error, return code of -1 if (code < 0 || pRow == NULL) { return -1; @@ -774,10 +1030,12 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + return code; } -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; @@ -785,10 +1043,32 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { getTableCacheKey(uid, "lr", key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { + STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); + if (pRow->ts <= eKey) { + taosLRUCacheRelease(pCache, h, true); + } else { + taosLRUCacheRelease(pCache, h, false); + } + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + getTableCacheKey(uid, "l", key, &keyLen); + h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + // clear last cache anyway, no matter where eKey ends. taosLRUCacheRelease(pCache, h, true); - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t - // keyLen); + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); } return code; } + +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 537fa5b866..a9c22b6b7f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { - tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid); + tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); } tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64