Merge branch 'feature/query' of https://github.com/taosdata/TDengine into feature/query
This commit is contained in:
commit
2fe0304b50
|
@ -31,8 +31,8 @@ extern "C" {
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
#include "qExecutor.h"
|
#include "qExecutor.h"
|
||||||
|
#include "qSqlparser.h"
|
||||||
#include "qTsbuf.h"
|
#include "qTsbuf.h"
|
||||||
#include "qsqlparser.h"
|
|
||||||
#include "tcmdtype.h"
|
#include "tcmdtype.h"
|
||||||
|
|
||||||
// forward declaration
|
// forward declaration
|
||||||
|
|
|
@ -148,7 +148,7 @@ void taos_init_imp() {
|
||||||
refreshTime = refreshTime < 10 ? 10 : refreshTime;
|
refreshTime = refreshTime < 10 ? 10 : refreshTime;
|
||||||
|
|
||||||
if (tscCacheHandle == NULL) {
|
if (tscCacheHandle == NULL) {
|
||||||
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "client");
|
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("client is initialized successfully");
|
tscDebug("client is initialized successfully");
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "qFill.h"
|
#include "qFill.h"
|
||||||
#include "qResultbuf.h"
|
#include "qResultbuf.h"
|
||||||
|
#include "qSqlparser.h"
|
||||||
#include "qTsbuf.h"
|
#include "qTsbuf.h"
|
||||||
#include "qsqlparser.h"
|
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef TDENGINE_VNODEQUERYUTIL_H
|
#ifndef TDENGINE_QRESULTBUF_H
|
||||||
#define TDENGINE_VNODEQUERYUTIL_H
|
#define TDENGINE_QRESULTBUF_H
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -26,11 +26,18 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SArray* SIDList;
|
typedef struct SArray* SIDList;
|
||||||
|
|
||||||
|
typedef struct SPageInfo {
|
||||||
|
int32_t pageId;
|
||||||
|
int32_t offset;
|
||||||
|
int32_t lengthOnDisk;
|
||||||
|
} SPageInfo;
|
||||||
|
|
||||||
typedef struct SDiskbasedResultBuf {
|
typedef struct SDiskbasedResultBuf {
|
||||||
int32_t numOfRowsPerPage;
|
int32_t numOfRowsPerPage;
|
||||||
int32_t numOfPages;
|
int32_t numOfPages;
|
||||||
int64_t totalBufSize;
|
int64_t totalBufSize;
|
||||||
int32_t fd; // data file fd
|
FILE* file;
|
||||||
|
// int32_t fd; // data file fd
|
||||||
int32_t allocateId; // allocated page id
|
int32_t allocateId; // allocated page id
|
||||||
int32_t incStep; // minimum allocated pages
|
int32_t incStep; // minimum allocated pages
|
||||||
void* pBuf; // mmap buffer pointer
|
void* pBuf; // mmap buffer pointer
|
||||||
|
@ -43,6 +50,8 @@ typedef struct SDiskbasedResultBuf {
|
||||||
void* iBuf; // inmemory buf
|
void* iBuf; // inmemory buf
|
||||||
void* handle; // for debug purpose
|
void* handle; // for debug purpose
|
||||||
void* emptyDummyIdList; // dummy id list
|
void* emptyDummyIdList; // dummy id list
|
||||||
|
bool comp;
|
||||||
|
|
||||||
} SDiskbasedResultBuf;
|
} SDiskbasedResultBuf;
|
||||||
|
|
||||||
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
|
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
|
||||||
|
@ -56,7 +65,7 @@ typedef struct SDiskbasedResultBuf {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
|
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
|
||||||
int32_t inMemPages, void* handle);
|
int32_t inMemPages, const void* handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -126,4 +135,4 @@ int32_t getLastPageId(SIDList pList);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif // TDENGINE_VNODEQUERYUTIL_H
|
#endif // TDENGINE_QRESULTBUF_H
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
|
|
||||||
#include "exception.h"
|
#include "exception.h"
|
||||||
#include "qAst.h"
|
#include "qAst.h"
|
||||||
|
#include "qSqlparser.h"
|
||||||
#include "qSyntaxtreefunction.h"
|
#include "qSyntaxtreefunction.h"
|
||||||
#include "qsqlparser.h"
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
|
|
@ -6617,14 +6617,16 @@ void* qOpenQueryMgmt(int32_t vgId) {
|
||||||
char cacheName[128] = {0};
|
char cacheName[128] = {0};
|
||||||
sprintf(cacheName, "qhandle_%d", vgId);
|
sprintf(cacheName, "qhandle_%d", vgId);
|
||||||
|
|
||||||
SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
|
SQueryMgmt* pQueryMgmt = calloc(1, sizeof(SQueryMgmt));
|
||||||
|
|
||||||
pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
|
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
|
||||||
pQueryHandle->closed = false;
|
pQueryMgmt->closed = false;
|
||||||
pthread_mutex_init(&pQueryHandle->lock, NULL);
|
pQueryMgmt->vgId = vgId;
|
||||||
|
|
||||||
|
pthread_mutex_init(&pQueryMgmt->lock, NULL);
|
||||||
|
|
||||||
qDebug("vgId:%d, open querymgmt success", vgId);
|
qDebug("vgId:%d, open querymgmt success", vgId);
|
||||||
return pQueryHandle;
|
return pQueryMgmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void queryMgmtKillQueryFn(void* handle) {
|
static void queryMgmtKillQueryFn(void* handle) {
|
||||||
|
@ -6664,7 +6666,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
|
||||||
pthread_mutex_destroy(&pQueryMgmt->lock);
|
pthread_mutex_destroy(&pQueryMgmt->lock);
|
||||||
tfree(pQueryMgmt);
|
tfree(pQueryMgmt);
|
||||||
|
|
||||||
qDebug("vgId:%d querymgmt cleanup completed", vgId);
|
qDebug("vgId:%d queryMgmt cleanup completed", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "qsqlparser.h"
|
#include "qSqlparser.h"
|
||||||
#include "queryLog.h"
|
#include "queryLog.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
|
||||||
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
|
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
|
||||||
int32_t pagesize, int32_t inMemPages, void* handle) {
|
int32_t pagesize, int32_t inMemPages, const void* handle) {
|
||||||
|
|
||||||
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
|
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
|
||||||
SDiskbasedResultBuf* pResBuf = *pResultBuf;
|
SDiskbasedResultBuf* pResBuf = *pResultBuf;
|
||||||
|
@ -24,6 +24,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
|
||||||
pResBuf->incStep = 4;
|
pResBuf->incStep = 4;
|
||||||
pResBuf->allocateId = -1;
|
pResBuf->allocateId = -1;
|
||||||
|
|
||||||
|
// todo opt perf by on demand create in memory buffer
|
||||||
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
|
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
|
||||||
|
|
||||||
// init id hash table
|
// init id hash table
|
||||||
|
@ -31,10 +32,10 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
|
||||||
pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
|
pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
char path[PATH_MAX] = {0};
|
||||||
getTmpfilePath("tsdb_qbuf", path);
|
getTmpfilePath("qbuf", path);
|
||||||
pResBuf->path = strdup(path);
|
pResBuf->path = strdup(path);
|
||||||
|
|
||||||
pResBuf->fd = FD_INITIALIZER;
|
pResBuf->file = NULL;
|
||||||
pResBuf->pBuf = NULL;
|
pResBuf->pBuf = NULL;
|
||||||
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
|
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
|
||||||
|
|
||||||
|
@ -52,8 +53,9 @@ int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->tota
|
||||||
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
|
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
|
||||||
|
|
||||||
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
|
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
|
||||||
pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
|
// pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
|
||||||
if (!FD_VALID(pResultBuf->fd)) {
|
pResultBuf->file = fopen(pResultBuf->path, "r+");
|
||||||
|
if (pResultBuf->file == NULL) {
|
||||||
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
|
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
|
@ -61,13 +63,15 @@ static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
|
||||||
assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
|
assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
|
||||||
pResultBuf->numOfPages += pResultBuf->incStep;
|
pResultBuf->numOfPages += pResultBuf->incStep;
|
||||||
|
|
||||||
int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
|
int32_t ret = ftruncate(fileno(pResultBuf->file), NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
|
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
|
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED,
|
||||||
|
fileno(pResultBuf->file), 0);
|
||||||
|
|
||||||
if (pResultBuf->pBuf == MAP_FAILED) {
|
if (pResultBuf->pBuf == MAP_FAILED) {
|
||||||
qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
|
qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -82,7 +86,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (pResultBuf->pBuf == NULL) {
|
if (pResultBuf->pBuf == NULL) {
|
||||||
assert(pResultBuf->fd == FD_INITIALIZER);
|
assert(pResultBuf->file == NULL);
|
||||||
|
|
||||||
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
|
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -95,7 +99,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
|
||||||
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
|
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
|
||||||
* be insufficient
|
* be insufficient
|
||||||
*/
|
*/
|
||||||
ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
|
ret = ftruncate(fileno(pResultBuf->file), NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
|
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
|
||||||
// strerror(errno));
|
// strerror(errno));
|
||||||
|
@ -103,7 +107,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
|
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
|
||||||
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
|
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, fileno(pResultBuf->file), 0);
|
||||||
|
|
||||||
if (pResultBuf->pBuf == MAP_FAILED) {
|
if (pResultBuf->pBuf == MAP_FAILED) {
|
||||||
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
|
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
|
||||||
|
@ -185,11 +189,11 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (FD_VALID(pResultBuf->fd)) {
|
if (pResultBuf->file != NULL) {
|
||||||
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
|
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
|
||||||
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
|
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
|
||||||
|
|
||||||
close(pResultBuf->fd);
|
fclose(pResultBuf->file);
|
||||||
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
|
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
|
||||||
pResultBuf->pBuf = NULL;
|
pResultBuf->pBuf = NULL;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
#include "qsqlparser.h"
|
#include "qSqlparser.h"
|
||||||
#include "tcmdtype.h"
|
#include "tcmdtype.h"
|
||||||
#include "tstoken.h"
|
#include "tstoken.h"
|
||||||
#include "ttokendef.h"
|
#include "ttokendef.h"
|
||||||
|
|
|
@ -654,8 +654,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
||||||
|
|
||||||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||||
pQueryHandle->cost.blockLoadTime += elapsedTime;
|
pQueryHandle->cost.blockLoadTime += elapsedTime;
|
||||||
tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, elapsedTime);
|
|
||||||
|
|
||||||
|
tsdbDebug("%p load file block into buffer, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us",
|
||||||
|
pQueryHandle, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime);
|
||||||
return blockLoaded;
|
return blockLoaded;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -971,6 +972,52 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) {
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||||
|
if (numOfRows < pQueryHandle->outputCapacity) {
|
||||||
|
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void getQualifiedRowsPos(STsdbQueryHandle* pQueryHandle, int32_t startPos, int32_t endPos,
|
||||||
|
int32_t numOfExisted, int32_t *start, int32_t *end) {
|
||||||
|
*start = -1;
|
||||||
|
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
int32_t remain = endPos - startPos + 1;
|
||||||
|
if (remain + numOfExisted > pQueryHandle->outputCapacity) {
|
||||||
|
*end = (pQueryHandle->outputCapacity - numOfExisted) + startPos - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*start = startPos;
|
||||||
|
} else {
|
||||||
|
int32_t remain = (startPos - endPos) + 1;
|
||||||
|
if (remain + numOfExisted > pQueryHandle->outputCapacity) {
|
||||||
|
*end = startPos + 1 - (pQueryHandle->outputCapacity - numOfExisted);
|
||||||
|
}
|
||||||
|
|
||||||
|
*start = *end;
|
||||||
|
*end = startPos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
|
||||||
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
|
||||||
|
pCheckInfo->lastKey = cur->lastKey;
|
||||||
|
pQueryHandle->realNumOfRows = numOfRows;
|
||||||
|
cur->rows = numOfRows;
|
||||||
|
cur->pos = endPos;
|
||||||
|
}
|
||||||
|
|
||||||
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
||||||
// be included in the query time window will be discarded
|
// be included in the query time window will be discarded
|
||||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||||
|
@ -978,7 +1025,10 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||||
|
|
||||||
initTableMemIterator(pQueryHandle, pCheckInfo);
|
initTableMemIterator(pQueryHandle, pCheckInfo);
|
||||||
|
|
||||||
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
||||||
|
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
|
||||||
|
TSKEY* tsArray = pCols->cols[0].pData;
|
||||||
|
|
||||||
// for search the endPos, so the order needs to reverse
|
// for search the endPos, so the order needs to reverse
|
||||||
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||||
|
@ -1004,9 +1054,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
||||||
int32_t pos = cur->pos;
|
int32_t pos = cur->pos;
|
||||||
|
|
||||||
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
|
|
||||||
TSKEY* tsArray = pCols->cols[0].pData;
|
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
|
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
|
@ -1014,34 +1061,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
|
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
|
||||||
int32_t start = cur->pos;
|
int32_t start = cur->pos;
|
||||||
int32_t end = endPos;
|
int32_t end = endPos;
|
||||||
|
|
||||||
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
end = cur->pos;
|
SWAP(start, end, int32_t);
|
||||||
start = endPos;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cur->win.skey = tsArray[start];
|
|
||||||
cur->win.ekey = tsArray[end];
|
|
||||||
|
|
||||||
// todo opt in case of no data in buffer
|
|
||||||
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
||||||
|
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
|
||||||
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
|
|
||||||
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
|
||||||
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pos += (end - start + 1) * step;
|
pos += (end - start + 1) * step;
|
||||||
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
|
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
|
||||||
|
|
||||||
pCheckInfo->lastKey = cur->lastKey;
|
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||||
pQueryHandle->realNumOfRows = numOfRows;
|
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
||||||
cur->rows = numOfRows;
|
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
||||||
SSkipListNode* node = NULL;
|
SSkipListNode* node = NULL;
|
||||||
|
@ -1088,26 +1123,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
moveToNextRowInMem(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t start = -1;
|
int32_t qstart = 0, qend = 0;
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
|
||||||
int32_t remain = end - pos + 1;
|
|
||||||
if (remain + numOfRows > pQueryHandle->outputCapacity) {
|
|
||||||
end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
start = pos;
|
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
|
||||||
} else {
|
pos += (qend - qstart + 1) * step;
|
||||||
int32_t remain = (pos - end) + 1;
|
|
||||||
if (remain + numOfRows > pQueryHandle->outputCapacity) {
|
|
||||||
end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
start = end;
|
cur->win.ekey = tsArray[end];
|
||||||
end = pos;
|
cur->lastKey = cur->win.ekey + step;
|
||||||
}
|
|
||||||
|
|
||||||
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
|
||||||
pos += (end - start + 1) * step;
|
|
||||||
}
|
}
|
||||||
} while (numOfRows < pQueryHandle->outputCapacity);
|
} while (numOfRows < pQueryHandle->outputCapacity);
|
||||||
|
|
||||||
|
@ -1124,30 +1147,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
cur->win.skey = tsArray[pos];
|
cur->win.skey = tsArray[pos];
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t start = -1;
|
int32_t start = -1, end = -1;
|
||||||
int32_t end = -1;
|
getQualifiedRowsPos(pQueryHandle, pos, endPos, numOfRows, &start, &end);
|
||||||
|
|
||||||
// all remain data are qualified, but check the remain capacity in the first place.
|
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
|
||||||
int32_t remain = endPos - pos + 1;
|
|
||||||
if (remain + numOfRows > pQueryHandle->outputCapacity) {
|
|
||||||
endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
start = pos;
|
|
||||||
end = endPos;
|
|
||||||
} else {
|
|
||||||
int32_t remain = pos + 1;
|
|
||||||
if (remain + numOfRows > pQueryHandle->outputCapacity) {
|
|
||||||
endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
start = endPos;
|
|
||||||
end = pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
||||||
pos += (end - start + 1) * step;
|
pos += (end - start + 1) * step;
|
||||||
|
|
||||||
|
cur->win.ekey = tsArray[end];
|
||||||
|
cur->lastKey = cur->win.ekey + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1157,21 +1164,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
|
|
||||||
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
|
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
|
||||||
|
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
|
||||||
if (numOfRows < pQueryHandle->outputCapacity) {
|
|
||||||
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
|
||||||
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pCheckInfo->lastKey = cur->lastKey;
|
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
||||||
pQueryHandle->realNumOfRows = numOfRows;
|
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
||||||
cur->rows = numOfRows;
|
|
||||||
cur->pos = pos;
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey);
|
||||||
|
} else {
|
||||||
|
assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
|
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
|
||||||
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
|
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
|
||||||
|
|
|
@ -674,6 +674,7 @@ void* taosCacheTimedRefresh(void *handle) {
|
||||||
|
|
||||||
// check if current cache object will be deleted every 500ms.
|
// check if current cache object will be deleted every 500ms.
|
||||||
if (pCacheObj->deleting) {
|
if (pCacheObj->deleting) {
|
||||||
|
uDebug("%s refresh threads quit", pCacheObj->name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue