Merge branch 'feature/3_liaohj' of github.com:taosdata/tdengine into feature/3_liaohj
This commit is contained in:
commit
2c656a2cdc
|
@ -61,8 +61,6 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
|
||||||
*len = sizeof(uint64_t);
|
*len = sizeof(uint64_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); }
|
|
||||||
|
|
||||||
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
|
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
|
||||||
|
|
||||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||||
|
@ -75,13 +73,23 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||||
getTableCacheKey(uid, 0, key, &keyLen);
|
getTableCacheKey(uid, 0, key, &keyLen);
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
if (h) {
|
if (h) {
|
||||||
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||||
if (pRow->ts <= eKey) {
|
bool invalidate = false;
|
||||||
|
int16_t nCol = taosArrayGetSize(pLast);
|
||||||
|
|
||||||
|
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
|
||||||
|
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||||
|
if (eKey >= tTsVal->ts) {
|
||||||
|
invalidate = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (invalidate) {
|
||||||
taosLRUCacheRelease(pCache, h, true);
|
taosLRUCacheRelease(pCache, h, true);
|
||||||
} else {
|
} else {
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,14 +138,23 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||||
getTableCacheKey(uid, 0, key, &keyLen);
|
getTableCacheKey(uid, 0, key, &keyLen);
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
if (h) {
|
if (h) {
|
||||||
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||||
if (pRow->ts <= eKey) {
|
bool invalidate = false;
|
||||||
|
int16_t nCol = taosArrayGetSize(pLast);
|
||||||
|
|
||||||
|
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
|
||||||
|
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||||
|
if (eKey >= tTsVal->ts) {
|
||||||
|
invalidate = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (invalidate) {
|
||||||
taosLRUCacheRelease(pCache, h, true);
|
taosLRUCacheRelease(pCache, h, true);
|
||||||
} else {
|
} else {
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTableCacheKey(uid, "l", key, &keyLen);
|
// getTableCacheKey(uid, "l", key, &keyLen);
|
||||||
|
@ -177,6 +194,46 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
|
||||||
getTableCacheKey(uid, 0, key, &keyLen);
|
getTableCacheKey(uid, 0, key, &keyLen);
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
if (h) {
|
if (h) {
|
||||||
|
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||||
|
TSKEY keyTs = row->ts;
|
||||||
|
bool invalidate = false;
|
||||||
|
|
||||||
|
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||||
|
int16_t nCol = taosArrayGetSize(pLast);
|
||||||
|
int16_t iCol = 0;
|
||||||
|
|
||||||
|
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||||
|
if (keyTs > tTsVal->ts) {
|
||||||
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
|
||||||
|
|
||||||
|
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
|
||||||
|
}
|
||||||
|
|
||||||
|
for (++iCol; iCol < nCol; ++iCol) {
|
||||||
|
SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||||
|
if (keyTs >= tTsVal1->ts) {
|
||||||
|
SColVal *tColVal = &tTsVal1->colVal;
|
||||||
|
|
||||||
|
SColVal colVal = {0};
|
||||||
|
tTSRowGetVal(row, pTSchema, iCol, &colVal);
|
||||||
|
if (!COL_VAL_IS_NONE(&colVal)) {
|
||||||
|
if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) {
|
||||||
|
invalidate = true;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_invalidate:
|
||||||
|
taosMemoryFreeClear(pTSchema);
|
||||||
|
|
||||||
|
taosLRUCacheRelease(pCache, h, invalidate);
|
||||||
|
/*
|
||||||
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||||
if (row->ts >= cacheRow->ts) {
|
if (row->ts >= cacheRow->ts) {
|
||||||
if (row->ts == cacheRow->ts) {
|
if (row->ts == cacheRow->ts) {
|
||||||
|
@ -218,9 +275,9 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
/* tsdbCacheInsertLastrow(pCache, uid, row, dup); */
|
// tsdbCacheInsertLastrow(pCache, uid, row, dup);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}*/
|
||||||
} /*else {
|
} /*else {
|
||||||
if (dup) {
|
if (dup) {
|
||||||
cacheRow = tdRowDup(row);
|
cacheRow = tdRowDup(row);
|
||||||
|
@ -1049,7 +1106,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
|
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||||
|
@ -1073,12 +1130,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||||
|
|
||||||
if (lastRowTs == TSKEY_MAX) {
|
if (lastRowTs == TSKEY_MAX) {
|
||||||
lastRowTs = TSDBROW_TS(pRow);
|
lastRowTs = rowTs;
|
||||||
STColumn *pTColumn = &pTSchema->columns[0];
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
|
||||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
|
||||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -1086,7 +1145,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
for (iCol = 1; iCol < nCol; ++iCol) {
|
for (iCol = 1; iCol < nCol; ++iCol) {
|
||||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||||
|
|
||||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -1097,15 +1156,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!setNoneCol) {
|
if (!setNoneCol) {
|
||||||
// goto build the result ts row
|
// done, goto return pColArray
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((TSDBROW_TS(pRow) < lastRowTs)) {
|
if ((rowTs < lastRowTs)) {
|
||||||
// goto build the result ts row
|
// done, goto return pColArray
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1117,7 +1176,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
|
|
||||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||||
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
||||||
taosArraySet(pColArray, iCol, pColVal);
|
taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||||
} else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
|
} else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
|
||||||
noneCol = iCol;
|
noneCol = iCol;
|
||||||
setNoneCol = true;
|
setNoneCol = true;
|
||||||
|
@ -1128,14 +1187,13 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
// build the result ts row here
|
// build the result ts row here
|
||||||
*dup = false;
|
*dup = false;
|
||||||
if (taosArrayGetSize(pColArray) == nCol) {
|
if (taosArrayGetSize(pColArray) == nCol) {
|
||||||
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
|
*ppColArray = NULL;
|
||||||
if (code) goto _err;
|
taosArrayDestroy(pColArray);
|
||||||
} else {
|
} else {
|
||||||
*ppRow = NULL;
|
*ppColArray = pColArray;
|
||||||
}
|
}
|
||||||
|
|
||||||
nextRowIterClose(&iter);
|
nextRowIterClose(&iter);
|
||||||
taosArrayDestroy(pColArray);
|
|
||||||
taosMemoryFreeClear(pTSchema);
|
taosMemoryFreeClear(pTSchema);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
@ -1196,7 +1254,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!setNoneCol) {
|
if (!setNoneCol) {
|
||||||
// goto build the result ts row
|
// done, goto return pColArray
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
|
@ -1220,7 +1278,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||||
}
|
}
|
||||||
} while (setNoneCol);
|
} while (setNoneCol);
|
||||||
|
|
||||||
// build the result ts row here
|
|
||||||
if (taosArrayGetSize(pColArray) <= 0) {
|
if (taosArrayGetSize(pColArray) <= 0) {
|
||||||
*ppLastArray = NULL;
|
*ppLastArray = NULL;
|
||||||
taosArrayDestroy(pColArray);
|
taosArrayDestroy(pColArray);
|
||||||
|
@ -1253,13 +1310,13 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
|
||||||
|
|
||||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
if (!h) {
|
if (!h) {
|
||||||
STSRow *pRow = NULL;
|
SArray *pArray = NULL;
|
||||||
bool dup = false; // which is always false for now
|
bool dup = false; // which is always false for now
|
||||||
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
|
code = mergeLastRow(uid, pTsdb, &dup, &pArray);
|
||||||
// if table's empty or error, return code of -1
|
// if table's empty or error, return code of -1
|
||||||
if (code < 0 || pRow == NULL) {
|
if (code < 0 || pArray == NULL) {
|
||||||
if (!dup && pRow) {
|
if (!dup && pArray) {
|
||||||
taosMemoryFree(pRow);
|
taosArrayDestroy(pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
@ -1269,9 +1326,9 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
|
_taos_lru_deleter_t deleter = deleteTableCacheLast;
|
||||||
LRUStatus status =
|
LRUStatus status =
|
||||||
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
|
taosLRUCacheInsert(pCache, key, keyLen, pArray, pArray->capacity, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
@ -1329,7 +1386,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
||||||
SArray *pLastArray = NULL;
|
SArray *pLastArray = NULL;
|
||||||
code = mergeLast(uid, pTsdb, &pLastArray);
|
code = mergeLast(uid, pTsdb, &pLastArray);
|
||||||
// if table's empty or error, return code of -1
|
// if table's empty or error, return code of -1
|
||||||
// if (code < 0 || pRow == NULL) {
|
|
||||||
if (code < 0 || pLastArray == NULL) {
|
if (code < 0 || pLastArray == NULL) {
|
||||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,8 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
|
||||||
|
|
||||||
// no data in the table of Uid
|
// no data in the table of Uid
|
||||||
if (*h != NULL) {
|
if (*h != NULL) {
|
||||||
*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
|
//*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
|
||||||
|
SArray* pLastrow = (SArray*)taosLRUCacheValue(lruCache, *h);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
|
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
|
||||||
|
|
Loading…
Reference in New Issue