Merge pull request #2703 from taosdata/feature/query
[td-225] fix bugs in first/last query
This commit is contained in:
commit
e633174a84
|
@ -2236,6 +2236,27 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
|
||||||
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w);
|
||||||
|
pWindowResInfo->startTime = w.skey;
|
||||||
|
pWindowResInfo->prevSKey = w.skey;
|
||||||
|
} else {
|
||||||
|
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
||||||
|
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, pQuery->window.ekey, pBlockInfo->window.ekey, &w);
|
||||||
|
|
||||||
|
pWindowResInfo->startTime = pQuery->window.skey;
|
||||||
|
pWindowResInfo->prevSKey = w.skey;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
STableQueryInfo* pTableQueryInfo = pQuery->current;
|
STableQueryInfo* pTableQueryInfo = pQuery->current;
|
||||||
|
@ -2263,24 +2284,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
|
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
|
||||||
|
doSetInitialTimewindow(pRuntimeEnv, &blockInfo);
|
||||||
// todo extract methods
|
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
|
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
|
||||||
getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &w);
|
|
||||||
pWindowResInfo->startTime = w.skey;
|
|
||||||
pWindowResInfo->prevSKey = w.skey;
|
|
||||||
} else {
|
|
||||||
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
|
||||||
getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &w);
|
|
||||||
|
|
||||||
pWindowResInfo->startTime = pQuery->window.skey;
|
|
||||||
pWindowResInfo->prevSKey = w.skey;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
|
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
|
||||||
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
|
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
|
||||||
|
@ -2314,7 +2318,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
|
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
|
||||||
// removeRedundantWindow(&pRuntimeEnv->windowResInfo, pTableQueryInfo->lastKey - step, step);
|
|
||||||
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
|
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
|
||||||
} else {
|
} else {
|
||||||
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
|
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
|
||||||
|
@ -3223,6 +3226,13 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
||||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||||
|
|
||||||
SWITCH_ORDER(pQuery->order.order);
|
SWITCH_ORDER(pQuery->order.order);
|
||||||
|
|
||||||
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
assert(pQuery->window.skey <= pQuery->window.ekey);
|
||||||
|
} else {
|
||||||
|
assert(pQuery->window.skey >= pQuery->window.ekey);
|
||||||
|
}
|
||||||
|
|
||||||
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
|
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
|
||||||
|
|
||||||
STsdbQueryCond cond = {
|
STsdbQueryCond cond = {
|
||||||
|
@ -3262,8 +3272,7 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
|
||||||
|
|
||||||
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
||||||
|
|
||||||
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
|
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
|
||||||
// during reverse scan
|
|
||||||
pTableQueryInfo->lastKey = pStatus->lastKey;
|
pTableQueryInfo->lastKey = pStatus->lastKey;
|
||||||
pQuery->status = pStatus->status;
|
pQuery->status = pStatus->status;
|
||||||
|
|
||||||
|
@ -3289,7 +3298,12 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||||
|
|
||||||
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
|
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
|
||||||
qstatus.status = pQuery->status;
|
qstatus.status = pQuery->status;
|
||||||
qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step;
|
|
||||||
|
// do nothing if no data blocks are found qualified during scan
|
||||||
|
if (qstatus.lastKey != pTableQueryInfo->lastKey) {
|
||||||
|
qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step;
|
||||||
|
}
|
||||||
|
|
||||||
qstatus.lastKey = pTableQueryInfo->lastKey;
|
qstatus.lastKey = pTableQueryInfo->lastKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6282,7 +6296,7 @@ void qTableQuery(qinfo_t qinfo) {
|
||||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
||||||
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
|
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
|
||||||
buildTagQueryResult(pQInfo); // todo support the limit/offset
|
buildTagQueryResult(pQInfo);
|
||||||
} else if (pQInfo->runtimeEnv.stableQuery) {
|
} else if (pQInfo->runtimeEnv.stableQuery) {
|
||||||
stableQueryImpl(pQInfo);
|
stableQueryImpl(pQInfo);
|
||||||
} else {
|
} else {
|
||||||
|
@ -6403,6 +6417,22 @@ int32_t qKillQuery(qinfo_t qinfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
|
||||||
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
if (val == NULL) {
|
||||||
|
setVardataNull(output, type);
|
||||||
|
} else {
|
||||||
|
memcpy(output, val, varDataTLen(val));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (val == NULL) {
|
||||||
|
setNull(output, type, bytes);
|
||||||
|
} else { // todo here stop will cause client crash
|
||||||
|
memcpy(output, val, bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void buildTagQueryResult(SQInfo* pQInfo) {
|
static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
@ -6459,25 +6489,11 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
output += sizeof(pQInfo->vgId);
|
output += sizeof(pQInfo->vgId);
|
||||||
|
|
||||||
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
char *data = tsdbGetTableName(item->pTable);
|
char* data = tsdbGetTableName(item->pTable);
|
||||||
memcpy(output, data, varDataTLen(data));
|
memcpy(output, data, varDataTLen(data));
|
||||||
} else {
|
} else {
|
||||||
char *val = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes);
|
char* data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes);
|
||||||
|
doSetTagValueToResultBuf(output, data, type, bytes);
|
||||||
// todo refactor
|
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
if (val == NULL) {
|
|
||||||
setVardataNull(output, type);
|
|
||||||
} else {
|
|
||||||
memcpy(output, val, varDataTLen(val));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (val == NULL) {
|
|
||||||
setNull(output, type, bytes);
|
|
||||||
} else { // todo here stop will cause client crash
|
|
||||||
memcpy(output, val, bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
|
@ -6494,38 +6510,44 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
} else { // return only the tags|table name etc.
|
} else { // return only the tags|table name etc.
|
||||||
count = 0;
|
count = 0;
|
||||||
SSchema tbnameSchema = tGetTableNameColumnSchema();
|
SSchema tbnameSchema = tGetTableNameColumnSchema();
|
||||||
while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) {
|
|
||||||
|
int32_t maxNumOfTables = pQuery->rec.capacity;
|
||||||
|
if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pQuery->rec.capacity) {
|
||||||
|
maxNumOfTables = pQuery->limit.limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
while(pQInfo->tableIndex < num && count < maxNumOfTables) {
|
||||||
int32_t i = pQInfo->tableIndex++;
|
int32_t i = pQInfo->tableIndex++;
|
||||||
|
|
||||||
|
// discard current result due to offset
|
||||||
|
if (pQuery->limit.offset > 0) {
|
||||||
|
pQuery->limit.offset -= 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
||||||
STableQueryInfo* item = taosArrayGetP(pa, i);
|
STableQueryInfo* item = taosArrayGetP(pa, i);
|
||||||
|
|
||||||
|
char *data = NULL, *dst = NULL;
|
||||||
|
int16_t type = 0, bytes = 0;
|
||||||
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
|
||||||
char* data = tsdbGetTableName(item->pTable);
|
|
||||||
char* dst = pQuery->sdata[j]->data + count * tbnameSchema.bytes;
|
|
||||||
memcpy(dst, data, varDataTLen(data));
|
|
||||||
} else {// todo refactor
|
|
||||||
int16_t type = pExprInfo[j].type;
|
|
||||||
int16_t bytes = pExprInfo[j].bytes;
|
|
||||||
|
|
||||||
char* data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
|
|
||||||
char* dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes;
|
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
if (data == NULL) {
|
bytes = tbnameSchema.bytes;
|
||||||
setVardataNull(dst, type);
|
type = tbnameSchema.type;
|
||||||
} else {
|
|
||||||
memcpy(dst, data, varDataTLen(data));
|
data = tsdbGetTableName(item->pTable);
|
||||||
}
|
dst = pQuery->sdata[j]->data + count * tbnameSchema.bytes;
|
||||||
} else {
|
} else {
|
||||||
if (data == NULL) {
|
type = pExprInfo[j].type;
|
||||||
setNull(dst, type, bytes);
|
bytes = pExprInfo[j].bytes;
|
||||||
} else {
|
|
||||||
memcpy(dst, data, pExprInfo[j].bytes);
|
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
|
||||||
}
|
dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||||
}
|
}
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,3 +126,14 @@ endi
|
||||||
if $data01 != 0 then
|
if $data01 != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
#add check for out of range first/last query
|
||||||
|
sql select first(ts),last(ts) from first_tb4 where ts>'2018-9-18 1:40:01';
|
||||||
|
if $row != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select first(ts),last(ts) from first_tb4 where ts<'2018-9-17 8:50:0';
|
||||||
|
if $row != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
|
@ -105,6 +105,21 @@ if $data03 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql select tag1 from st2 limit 20 offset 1
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select tag1 from st2 limit 10 offset 2
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select tag1 from st2 limit 0 offset 0
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
sql create table st3 using mt2 tags (NULL, 'ABC', 103, 'FALSE')
|
sql create table st3 using mt2 tags (NULL, 'ABC', 103, 'FALSE')
|
||||||
sql select tag1, tag2, tag3, tag5 from st3
|
sql select tag1, tag2, tag3, tag5 from st3
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
|
|
Loading…
Reference in New Issue