[td-168]
This commit is contained in:
parent
9e280acb7e
commit
079a8f7596
|
@ -131,17 +131,23 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
|
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
|
||||||
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i, pSchema[i].name,
|
char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i;
|
||||||
TSDB_COL_NAME_LEN);
|
STR_TO_VARSTR(dst, pSchema[i].name);
|
||||||
|
|
||||||
char *type = tDataTypeDesc[pSchema[i].type].aName;
|
char *type = tDataTypeDesc[pSchema[i].type].aName;
|
||||||
|
|
||||||
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
|
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
|
||||||
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i, type, pField->bytes);
|
dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
|
||||||
|
|
||||||
|
STR_TO_VARSTR(dst, type);
|
||||||
|
|
||||||
int32_t bytes = pSchema[i].bytes;
|
int32_t bytes = pSchema[i].bytes;
|
||||||
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
|
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
bytes = bytes / TSDB_NCHAR_SIZE;
|
bytes -= VARSTR_HEADER_SIZE;
|
||||||
|
|
||||||
|
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
bytes = bytes / TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
|
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
|
||||||
|
@ -233,7 +239,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, int32_t typeColLength,
|
static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, int32_t typeColLength,
|
||||||
int32_t noteColLength) {
|
int32_t noteColLength) {
|
||||||
int32_t rowLen = 0;
|
int32_t rowLen = 0;
|
||||||
SColumnIndex index = {0};
|
SColumnIndex index = {0};
|
||||||
|
@ -243,14 +249,14 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||||
pQueryInfo->order.order = TSDB_ORDER_ASC;
|
pQueryInfo->order.order = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_COL_NAME_LEN};
|
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE};
|
||||||
strncpy(f.name, "Field", TSDB_COL_NAME_LEN);
|
strncpy(f.name, "Field", TSDB_COL_NAME_LEN);
|
||||||
|
|
||||||
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
|
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
|
||||||
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN,
|
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
|
||||||
TSDB_COL_NAME_LEN, false);
|
TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE, TSDB_COL_NAME_LEN, false);
|
||||||
|
|
||||||
rowLen += TSDB_COL_NAME_LEN;
|
rowLen += (TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||||
|
|
||||||
f.bytes = typeColLength;
|
f.bytes = typeColLength;
|
||||||
f.type = TSDB_DATA_TYPE_BINARY;
|
f.type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
@ -289,17 +295,16 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
|
||||||
|
|
||||||
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
|
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
|
||||||
|
|
||||||
const int32_t NUM_OF_DESCRIBE_TABLE_COLUMNS = 4;
|
const int32_t NUM_OF_DESC_TABLE_COLUMNS = 4;
|
||||||
const int32_t TYPE_COLUMN_LENGTH = 16;
|
const int32_t TYPE_COLUMN_LENGTH = 16;
|
||||||
const int32_t NOTE_COLUMN_MIN_LENGTH = 8;
|
const int32_t NOTE_COLUMN_MIN_LENGTH = 8;
|
||||||
|
|
||||||
int32_t note_field_length = tscMaxLengthOfTagsFields(pSql);
|
int32_t noteFieldLen = tscMaxLengthOfTagsFields(pSql);
|
||||||
if (note_field_length == 0) {
|
if (noteFieldLen == 0) {
|
||||||
note_field_length = NOTE_COLUMN_MIN_LENGTH;
|
noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowLen =
|
int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen);
|
||||||
tscBuildMeterSchemaResultFields(pSql, NUM_OF_DESCRIBE_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, note_field_length);
|
|
||||||
tscFieldInfoUpdateOffset(pQueryInfo);
|
tscFieldInfoUpdateOffset(pQueryInfo);
|
||||||
return tscSetValueToResObj(pSql, rowLen);
|
return tscSetValueToResObj(pSql, rowLen);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,17 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define VARSTR_HEADER_SIZE sizeof(int16_t)
|
||||||
|
#define STR_TO_VARSTR(x, str) do {int16_t __len = strlen(str); \
|
||||||
|
*(int16_t*)(x) = __len; \
|
||||||
|
strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), __len);} while(0);
|
||||||
|
|
||||||
|
#define STR_TO_VARSTR_WITH_SIZE(x, str, _size) do {\
|
||||||
|
int16_t __len = strnlen((str), (_size)); \
|
||||||
|
*(int16_t*)(x) = __len; \
|
||||||
|
strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), __len);\
|
||||||
|
} while(0);
|
||||||
|
|
||||||
// ----------------- TSDB COLUMN DEFINITION
|
// ----------------- TSDB COLUMN DEFINITION
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type; // Column type
|
int8_t type; // Column type
|
||||||
|
|
|
@ -579,7 +579,11 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
|
||||||
cols = 0;
|
cols = 0;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
strcpy(pWrite, mgmtGetDbStr(pDb->name));
|
char* name = mgmtGetDbStr(pDb->name);
|
||||||
|
*(int16_t*) pWrite = strnlen(name, TSDB_DB_NAME_LEN);
|
||||||
|
pWrite += sizeof(int16_t); // todo refactor
|
||||||
|
|
||||||
|
strncpy(pWrite, mgmtGetDbStr(pDb->name), TSDB_DB_NAME_LEN);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
|
|
@ -1119,7 +1119,12 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
|
||||||
cols = 0;
|
cols = 0;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
strncpy(pWrite, stableName, TSDB_TABLE_NAME_LEN);
|
|
||||||
|
int16_t len = strnlen(stableName, TSDB_DB_NAME_LEN);
|
||||||
|
*(int16_t*) pWrite = len;
|
||||||
|
pWrite += sizeof(int16_t); // todo refactor
|
||||||
|
|
||||||
|
strncpy(pWrite, stableName, len);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
@ -2075,7 +2080,12 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
strncpy(pWrite, tableName, TSDB_TABLE_NAME_LEN);
|
|
||||||
|
int16_t len = strnlen(tableName, TSDB_DB_NAME_LEN);
|
||||||
|
*(int16_t*) pWrite = len;
|
||||||
|
pWrite += sizeof(int16_t); // todo refactor
|
||||||
|
|
||||||
|
strncpy(pWrite, tableName, len);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
|
|
@ -415,16 +415,11 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
||||||
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
||||||
|
|
||||||
if (pCheckInfo->pDataCols == NULL) {
|
if (pCheckInfo->pDataCols == NULL) {
|
||||||
pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096);
|
pCheckInfo->pDataCols = tdNewDataCols(1000, 100, 4096); //todo fix me
|
||||||
}
|
}
|
||||||
|
|
||||||
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
|
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
|
||||||
|
|
||||||
// SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA];
|
|
||||||
// if (pFile->fd == FD_INITIALIZER) {
|
|
||||||
// pFile->fd = open(pFile->fname, O_RDONLY);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
|
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
|
||||||
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
||||||
|
|
||||||
|
@ -601,27 +596,27 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// move the data block in the front to data block if needed
|
// move the data block in the front to data block if needed
|
||||||
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
int32_t numOfCols = pQueryHandle->rhelper.pDataCols[0]->numOfCols;
|
||||||
|
int32_t reqCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(sa); ++i) {
|
for (int32_t i = 0; i < reqCols; ++i) {
|
||||||
int16_t colId = *(int16_t*)taosArrayGet(sa, i);
|
// int16_t colId = *(int16_t*)taosArrayGet(sa, i);
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
int32_t bytes = pCol->info.bytes;
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfCols; ++j) {
|
for (int32_t j = 0; j < numOfCols; ++j) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j);
|
SDataCol* src = &pQueryHandle->rhelper.pDataCols[0]->cols[j];
|
||||||
int32_t bytes = pCol->info.bytes;
|
|
||||||
|
|
||||||
if (pCol->info.colId == colId) {
|
if (pCol->info.colId == src->colId) {
|
||||||
if (pCol->info.type != TSDB_DATA_TYPE_BINARY && pCol->info.type != TSDB_DATA_TYPE_NCHAR) {
|
if (pCol->info.type != TSDB_DATA_TYPE_BINARY && pCol->info.type != TSDB_DATA_TYPE_NCHAR) {
|
||||||
memmove(pCol->pData, pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + bytes * start,
|
memmove(pCol->pData, src->pData + bytes * start, bytes * pQueryHandle->realNumOfRows);
|
||||||
bytes * pQueryHandle->realNumOfRows);
|
} else { // handle the var-string
|
||||||
} else {
|
|
||||||
SDataCol* src = &pQueryHandle->rhelper.pDataCols[0]->cols[i];
|
|
||||||
|
|
||||||
for(int32_t k = start; k < pQueryHandle->realNumOfRows + start; ++k) {
|
for(int32_t k = start; k < pQueryHandle->realNumOfRows + start; ++k) {
|
||||||
char* p = tdGetColDataOfRow(src, k);
|
char* p = tdGetColDataOfRow(src, k);
|
||||||
memcpy(pCol->pData + k * bytes, p, *(int16_t*) p + sizeof(int16_t)); // todo refactor
|
memcpy(pCol->pData + k * bytes, p, varDataTLen(p)); // todo refactor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1010,7 +1005,7 @@ void changeQueryHandleForQuery(TsdbQueryHandleT pqHandle) {
|
||||||
pQueryHandle->window = (STimeWindow) {key, key};
|
pQueryHandle->window = (STimeWindow) {key, key};
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
|
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
|
||||||
STsdbQueryHandle* pQueryHandle) {
|
STsdbQueryHandle* pQueryHandle) {
|
||||||
int numOfRows = 0;
|
int numOfRows = 0;
|
||||||
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
|
@ -1040,9 +1035,12 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
|
||||||
|
|
||||||
*ekey = dataRowKey(row);
|
*ekey = dataRowKey(row);
|
||||||
|
|
||||||
int32_t offset = 0;
|
int32_t offset = -1;
|
||||||
char* pData = NULL;
|
char* pData = NULL;
|
||||||
|
|
||||||
|
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pTable);
|
||||||
|
int32_t numOfTableCols = schemaNCols(pSchema);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
|
||||||
|
@ -1052,6 +1050,15 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
|
||||||
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
|
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for(int32_t j = 0; j < numOfTableCols; ++j) {
|
||||||
|
if (pColInfo->info.colId == pSchema->columns[j].colId) {
|
||||||
|
offset = pSchema->columns[j].offset;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(offset != -1); // todo handle error
|
||||||
|
|
||||||
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
void *value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
|
void *value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
|
||||||
memcpy(pData, value, varDataTLen(value));
|
memcpy(pData, value, varDataTLen(value));
|
||||||
|
@ -1127,7 +1134,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
|
||||||
if (pTable->mem != NULL) {
|
if (pTable->mem != NULL) {
|
||||||
// create mem table iterator if it is not created yet
|
// create mem table iterator if it is not created yet
|
||||||
assert(pCheckInfo->iter != NULL);
|
assert(pCheckInfo->iter != NULL);
|
||||||
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 4000, &skey, &ekey, pHandle);
|
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, 4000, &skey, &ekey, pHandle);
|
||||||
|
|
||||||
// update the last key value
|
// update the last key value
|
||||||
pCheckInfo->lastKey = ekey + step;
|
pCheckInfo->lastKey = ekey + step;
|
||||||
|
|
|
@ -75,11 +75,13 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
doQuery(taos, "create database if not exists test");
|
doQuery(taos, "create database if not exists test");
|
||||||
doQuery(taos, "use test");
|
doQuery(taos, "use test");
|
||||||
|
doQuery(taos, "select * from t1 order by ts desc");
|
||||||
|
|
||||||
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
|
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
|
||||||
for(int32_t i = 0; i< 100000; ++i) {
|
// for(int32_t i = 0; i< 100000; ++i) {
|
||||||
doQuery(taos, "select m1.ts,m1.a from m1, m2 where m1.ts=m2.ts and m1.a=m2.b;");
|
// doQuery(taos, "select m1.ts,m1.a from m1, m2 where m1.ts=m2.ts and m1.a=m2.b;");
|
||||||
usleep(500000);
|
// usleep(500000);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 'abc')");
|
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 'abc')");
|
||||||
// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
|
// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
|
||||||
|
|
Loading…
Reference in New Issue