[td-4927]<enhance>:ignore the rows of expired timestamp that are not dropped yet during query according to the TTL settings.
This commit is contained in:
parent
839ad25fee
commit
6a288d80fb
|
@ -155,7 +155,7 @@ extern char tsMnodeTmpDir[];
|
||||||
extern char tsDataDir[];
|
extern char tsDataDir[];
|
||||||
extern char tsLogDir[];
|
extern char tsLogDir[];
|
||||||
extern char tsScriptDir[];
|
extern char tsScriptDir[];
|
||||||
extern int64_t tsMsPerDay[3];
|
extern int64_t tsTickPerDay[3];
|
||||||
|
|
||||||
// system info
|
// system info
|
||||||
extern char tsOsName[];
|
extern char tsOsName[];
|
||||||
|
|
|
@ -204,7 +204,7 @@ SDiskCfg tsDiskCfg[TSDB_MAX_DISKS];
|
||||||
* TSDB_TIME_PRECISION_MICRO: 86400000000L
|
* TSDB_TIME_PRECISION_MICRO: 86400000000L
|
||||||
* TSDB_TIME_PRECISION_NANO: 86400000000000L
|
* TSDB_TIME_PRECISION_NANO: 86400000000000L
|
||||||
*/
|
*/
|
||||||
int64_t tsMsPerDay[] = {86400000L, 86400000000L, 86400000000000L};
|
int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L};
|
||||||
|
|
||||||
// system info
|
// system info
|
||||||
char tsOsName[10] = "Linux";
|
char tsOsName[10] = "Linux";
|
||||||
|
|
|
@ -350,8 +350,8 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) {
|
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) {
|
||||||
*minKey = fid * days * tsMsPerDay[precision];
|
*minKey = fid * days * tsTickPerDay[precision];
|
||||||
*maxKey = *minKey + days * tsMsPerDay[precision] - 1;
|
*maxKey = *minKey + days * tsTickPerDay[precision] - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
|
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
|
||||||
|
|
|
@ -17,9 +17,9 @@
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
|
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
|
||||||
if (key < 0) {
|
if (key < 0) {
|
||||||
return (int)((key + 1) / tsMsPerDay[precision] / days - 1);
|
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
|
||||||
} else {
|
} else {
|
||||||
return (int)((key / tsMsPerDay[precision] / days));
|
return (int)((key / tsTickPerDay[precision] / days));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,9 +363,9 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
|
||||||
TSKEY minKey, midKey, maxKey, now;
|
TSKEY minKey, midKey, maxKey, now;
|
||||||
|
|
||||||
now = taosGetTimestamp(pCfg->precision);
|
now = taosGetTimestamp(pCfg->precision);
|
||||||
minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision];
|
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
|
||||||
midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision];
|
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
|
||||||
maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision];
|
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
|
||||||
|
|
||||||
pRtn->minKey = minKey;
|
pRtn->minKey = minKey;
|
||||||
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
|
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
|
||||||
|
|
|
@ -632,8 +632,8 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
SDataRow row = NULL;
|
SDataRow row = NULL;
|
||||||
TSKEY now = taosGetTimestamp(pRepo->config.precision);
|
TSKEY now = taosGetTimestamp(pRepo->config.precision);
|
||||||
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
|
TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep;
|
||||||
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
|
TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
pMsg->length = htonl(pMsg->length);
|
pMsg->length = htonl(pMsg->length);
|
||||||
|
|
|
@ -280,9 +280,17 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
|
||||||
info.tableId.uid = info.pTableObj->tableId.uid;
|
info.tableId.uid = info.pTableObj->tableId.uid;
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
assert(info.lastKey >= pQueryHandle->window.skey);
|
if (info.lastKey == INT64_MIN) {
|
||||||
|
info.lastKey = pQueryHandle->window.skey;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(info.lastKey >= pQueryHandle->window.skey && info.lastKey <= pQueryHandle->window.ekey);
|
||||||
} else {
|
} else {
|
||||||
assert(info.lastKey <= pQueryHandle->window.skey);
|
if (info.lastKey == INT64_MIN) {
|
||||||
|
info.lastKey = pQueryHandle->window.ekey;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(info.lastKey >= pQueryHandle->window.ekey && info.lastKey <= pQueryHandle->window.skey);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pTableCheckInfo, &info);
|
taosArrayPush(pTableCheckInfo, &info);
|
||||||
|
@ -339,14 +347,44 @@ static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY s
|
||||||
return pNew;
|
return pNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
|
||||||
|
// the expired data to client, even it is queried already.
|
||||||
|
static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) {
|
||||||
|
STsdbCfg* pCfg = &pTsdb->config;
|
||||||
|
|
||||||
|
int64_t now = taosGetTimestamp(pCfg->precision);
|
||||||
|
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) {
|
||||||
|
pQueryHandle->window = pCond->twindow;
|
||||||
|
|
||||||
|
int64_t startTs = getEarliestValidTimestamp(pQueryHandle->pTsdb);
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
if (startTs > pQueryHandle->window.skey) {
|
||||||
|
pQueryHandle->window.skey = startTs;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
|
||||||
|
} else {
|
||||||
|
if (startTs > pQueryHandle->window.ekey) {
|
||||||
|
pQueryHandle->window.ekey = startTs;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbDebug("%p update the query time window, old:%"PRId64" - %"PRId64", new:%"PRId64" - %"PRId64 ", 0x%"PRIx64,
|
||||||
|
pQueryHandle, pCond->twindow.skey, pCond->twindow.ekey, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId);
|
||||||
|
}
|
||||||
|
|
||||||
static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) {
|
static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) {
|
||||||
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
|
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
|
||||||
if (pQueryHandle == NULL) {
|
if (pQueryHandle == NULL) {
|
||||||
goto out_of_memory;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->order = pCond->order;
|
pQueryHandle->order = pCond->order;
|
||||||
pQueryHandle->window = pCond->twindow;
|
|
||||||
pQueryHandle->pTsdb = tsdb;
|
pQueryHandle->pTsdb = tsdb;
|
||||||
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||||
pQueryHandle->cur.fid = INT32_MIN;
|
pQueryHandle->cur.fid = INT32_MIN;
|
||||||
|
@ -354,36 +392,33 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
||||||
pQueryHandle->checkFiles = true;
|
pQueryHandle->checkFiles = true;
|
||||||
pQueryHandle->activeIndex = 0; // current active table index
|
pQueryHandle->activeIndex = 0; // current active table index
|
||||||
pQueryHandle->qId = qId;
|
pQueryHandle->qId = qId;
|
||||||
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
|
||||||
pQueryHandle->allocSize = 0;
|
pQueryHandle->allocSize = 0;
|
||||||
pQueryHandle->locateStart = false;
|
pQueryHandle->locateStart = false;
|
||||||
pQueryHandle->pMemRef = pMemRef;
|
pQueryHandle->pMemRef = pMemRef;
|
||||||
|
pQueryHandle->loadType = pCond->type;
|
||||||
|
|
||||||
|
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
||||||
pQueryHandle->loadExternalRow = pCond->loadExternalRows;
|
pQueryHandle->loadExternalRow = pCond->loadExternalRows;
|
||||||
pQueryHandle->currentLoadExternalRows = pCond->loadExternalRows;
|
pQueryHandle->currentLoadExternalRows = pCond->loadExternalRows;
|
||||||
|
|
||||||
pQueryHandle->loadType = pCond->type;
|
|
||||||
|
|
||||||
if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) {
|
if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) {
|
||||||
goto out_of_memory;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pCond != NULL && pMemRef != NULL);
|
assert(pCond != NULL && pMemRef != NULL);
|
||||||
if (ASCENDING_TRAVERSE(pCond->order)) {
|
setQueryTimewindow(pQueryHandle, pCond);
|
||||||
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
|
|
||||||
} else {
|
|
||||||
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
|
|
||||||
}
|
|
||||||
if (pCond->numOfCols > 0) {
|
if (pCond->numOfCols > 0) {
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
|
pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
|
||||||
if (pQueryHandle->statis == NULL) {
|
if (pQueryHandle->statis == NULL) {
|
||||||
goto out_of_memory;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->pColumns =
|
// todo: use list instead of array?
|
||||||
taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
|
pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
|
||||||
if (pQueryHandle->pColumns == NULL) {
|
if (pQueryHandle->pColumns == NULL) {
|
||||||
goto out_of_memory;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||||
|
@ -392,14 +427,16 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
||||||
colInfo.info = pCond->colList[i];
|
colInfo.info = pCond->colList[i];
|
||||||
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
|
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
|
||||||
if (colInfo.pData == NULL) {
|
if (colInfo.pData == NULL) {
|
||||||
goto out_of_memory;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pQueryHandle->pColumns, &colInfo);
|
taosArrayPush(pQueryHandle->pColumns, &colInfo);
|
||||||
pQueryHandle->statis[i].colId = colInfo.info.colId;
|
pQueryHandle->statis[i].colId = colInfo.info.colId;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
|
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||||
assert(pMeta != NULL);
|
assert(pMeta != NULL);
|
||||||
|
|
||||||
|
@ -407,7 +444,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
||||||
if (pQueryHandle->pDataCols == NULL) {
|
if (pQueryHandle->pDataCols == NULL) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
|
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
goto out_of_memory;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
||||||
|
@ -415,7 +452,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
||||||
|
|
||||||
return (TsdbQueryHandleT) pQueryHandle;
|
return (TsdbQueryHandleT) pQueryHandle;
|
||||||
|
|
||||||
out_of_memory:
|
_end:
|
||||||
tsdbCleanupQueryHandle(pQueryHandle);
|
tsdbCleanupQueryHandle(pQueryHandle);
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -864,10 +901,10 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key < 0) {
|
if (key < 0) {
|
||||||
key -= (daysPerFile * tsMsPerDay[precision]);
|
key -= (daysPerFile * tsTickPerDay[precision]);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId
|
int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision])); // set the starting fileId
|
||||||
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
|
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
|
||||||
fid = INT32_MIN;
|
fid = INT32_MIN;
|
||||||
}
|
}
|
||||||
|
@ -3548,7 +3585,6 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab
|
||||||
taosArrayPush(group, &info);
|
taosArrayPush(group, &info);
|
||||||
|
|
||||||
taosArrayPush(pGroupInfo->pGroupList, &group);
|
taosArrayPush(pGroupInfo->pGroupList, &group);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
Loading…
Reference in New Issue