[td-168] fix bug in handling the new string
This commit is contained in:
parent
187c7a2faa
commit
9cdc251d3d
|
@ -1844,6 +1844,7 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
|
|||
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
|
||||
|
||||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
|
||||
SLastrowInfo *pInfo = (SLastrowInfo *)pResInfo->interResultBuf;
|
||||
pInfo->ts = pCtx->param[0].i64Key;
|
||||
|
@ -1863,14 +1864,17 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
|
|||
|
||||
static void last_row_finalizer(SQLFunctionCtx *pCtx) {
|
||||
// do nothing at the first stage
|
||||
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
|
||||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// do nothing
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||
|
|
|
@ -132,7 +132,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
|
|||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
|
||||
char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i;
|
||||
STR_TO_VARSTR(dst, pSchema[i].name);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(dst, pSchema[i].name, TSDB_COL_NAME_LEN);
|
||||
|
||||
char *type = tDataTypeDesc[pSchema[i].type].aName;
|
||||
|
||||
|
@ -155,8 +155,8 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
|
|||
|
||||
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3);
|
||||
if (i >= tscGetNumOfColumns(pMeta) && tscGetNumOfTags(pMeta) != 0) {
|
||||
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i, "tag",
|
||||
strlen("tag") + 1);
|
||||
char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
|
||||
STR_WITH_SIZE_TO_VARSTR(output, "TAG", 3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -169,13 +169,15 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
|
|||
for (int32_t i = numOfRows; i < totalNumOfRows; ++i) {
|
||||
// field name
|
||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
|
||||
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i, pSchema[i].name,
|
||||
TSDB_COL_NAME_LEN);
|
||||
char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(output, pSchema[i].name, TSDB_COL_NAME_LEN);
|
||||
|
||||
// type name
|
||||
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
|
||||
char *type = tDataTypeDesc[pSchema[i].type].aName;
|
||||
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i, type, pField->bytes);
|
||||
|
||||
output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(output, type, pField->bytes);
|
||||
|
||||
// type length
|
||||
int32_t bytes = pSchema[i].bytes;
|
||||
|
@ -189,49 +191,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
|
|||
// tag value
|
||||
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3);
|
||||
char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
|
||||
|
||||
if (isNull(pTagValue, pSchema[i].type)) {
|
||||
sprintf(target, "%s", TSDB_DATA_NULL_STR);
|
||||
} else {
|
||||
switch (pSchema[i].type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
/* binary are not null-terminated string */
|
||||
strncpy(target, pTagValue, pSchema[i].bytes);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
taosUcs4ToMbs(pTagValue, pSchema[i].bytes, target);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float fv = 0;
|
||||
fv = GET_FLOAT_VAL(pTagValue);
|
||||
sprintf(target, "%f", fv);
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double dv = 0;
|
||||
dv = GET_DOUBLE_VAL(pTagValue);
|
||||
sprintf(target, "%lf", dv);
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
sprintf(target, "%d", *(int8_t *)pTagValue);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
sprintf(target, "%d", *(int16_t *)pTagValue);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
sprintf(target, "%d", *(int32_t *)pTagValue);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
sprintf(target, "%" PRId64 "", *(int64_t *)pTagValue);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL: {
|
||||
char *val = (*((int8_t *)pTagValue) == 0) ? "false" : "true";
|
||||
sprintf(target, "%s", val);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(target, "TAG", 3);
|
||||
|
||||
pTagValue += pSchema[i].bytes;
|
||||
}
|
||||
|
|
|
@ -1427,7 +1427,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
|
|||
}
|
||||
|
||||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN};
|
||||
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE};
|
||||
strcpy(colSchema.name, TSQL_TBNAME_L);
|
||||
|
||||
pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY;
|
||||
|
|
|
@ -26,15 +26,18 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define VARSTR_HEADER_SIZE sizeof(int16_t)
|
||||
#define STR_TO_VARSTR(x, str) do {int16_t __len = strlen(str); \
|
||||
*(int16_t*)(x) = __len; \
|
||||
#define STR_TO_VARSTR(x, str) do {VarDataLenT __len = strlen(str); \
|
||||
*(VarDataLenT*)(x) = __len; \
|
||||
strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), __len);} while(0);
|
||||
|
||||
#define STR_TO_VARSTR_WITH_SIZE(x, str, _size) do {\
|
||||
int16_t __len = strnlen((str), (_size)); \
|
||||
*(int16_t*)(x) = __len; \
|
||||
strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), __len);\
|
||||
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\
|
||||
char* _e = stpncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_maxs));\
|
||||
*(VarDataLenT*)(x) = _e - (x);\
|
||||
} while(0)
|
||||
|
||||
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\
|
||||
*(VarDataLenT*)(x) = (_size); \
|
||||
strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_size));\
|
||||
} while(0);
|
||||
|
||||
// ----------------- TSDB COLUMN DEFINITION
|
||||
|
|
|
@ -228,7 +228,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
|||
int32_t num = taosGetQueueNumber(pWorker->qset);
|
||||
|
||||
if (num > 0) {
|
||||
usleep(30000);
|
||||
usleep(30);
|
||||
sched_yield();
|
||||
} else {
|
||||
taosFreeQall(pWorker->qall);
|
||||
|
|
|
@ -43,6 +43,7 @@ typedef int16_t VarDataLenT;
|
|||
|
||||
// this data type is internally used only in 'in' query to hold the values
|
||||
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
|
||||
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
|
||||
|
||||
// Bytes for each type.
|
||||
extern const int32_t TYPE_BYTES[11];
|
||||
|
|
|
@ -66,6 +66,7 @@ typedef struct SMnodeObj {
|
|||
SDnodeObj *pDnode;
|
||||
} SMnodeObj;
|
||||
|
||||
// todo use dynamic length string
|
||||
typedef struct {
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
int8_t type;
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "ttime.h"
|
||||
#include "tname.h"
|
||||
#include "tbalance.h"
|
||||
#include "tdataformat.h"
|
||||
#include "mgmtDef.h"
|
||||
#include "mgmtLog.h"
|
||||
#include "mgmtAcct.h"
|
||||
|
@ -431,7 +432,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
|
|||
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||
if (pUser == NULL) return 0;
|
||||
|
||||
pShow->bytes[cols] = TSDB_DB_NAME_LEN;
|
||||
pShow->bytes[cols] = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
|
@ -439,7 +440,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
|
|||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
strcpy(pSchema[cols].name, "created_time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
|
@ -586,11 +587,9 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
|
|||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
char* name = mgmtGetDbStr(pDb->name);
|
||||
*(int16_t*) pWrite = strnlen(name, TSDB_DB_NAME_LEN);
|
||||
pWrite += sizeof(int16_t); // todo refactor
|
||||
|
||||
strncpy(pWrite, mgmtGetDbStr(pDb->name), TSDB_DB_NAME_LEN);
|
||||
char* name = mgmtGetDbStr(pDb->name);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, TSDB_DB_NAME_LEN);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
|
@ -626,7 +625,10 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
|
|||
#endif
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
sprintf(pWrite, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep);
|
||||
|
||||
char tmp[128] = {0};
|
||||
size_t n = sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep);
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, n);
|
||||
cols++;
|
||||
|
||||
#ifndef __CLOUD_VERSION__
|
||||
|
@ -674,7 +676,11 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
|
|||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, pDb->status != TSDB_DB_STATUS_READY ? "dropping" : "ready");
|
||||
if (pDb->status == TSDB_DB_STATUS_READY) {
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, "ready", 5);
|
||||
} else {
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, "dropping", 8);
|
||||
}
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
#include "mgmtUser.h"
|
||||
#include "mgmtVgroup.h"
|
||||
#include "tcompare.h"
|
||||
#include "tdataformat.h"
|
||||
|
||||
static void * tsChildTableSdb;
|
||||
static void * tsSuperTableSdb;
|
||||
|
@ -624,6 +625,7 @@ void mgmtCleanUpTables() {
|
|||
mgmtCleanUpSuperTables();
|
||||
}
|
||||
|
||||
// todo move to name.h, add length of table name
|
||||
static void mgmtExtractTableName(char* tableId, char* name) {
|
||||
int pos = -1;
|
||||
int num = 0;
|
||||
|
@ -1056,7 +1058,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
|
|||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->schema;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
|
@ -1064,7 +1066,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
|
|||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
strcpy(pSchema[cols].name, "created_time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
|
@ -2014,15 +2016,15 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
|
|||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->schema;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "table name");
|
||||
strcpy(pSchema[cols].name, "table_name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
strcpy(pSchema[cols].name, "created_time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
|
@ -2032,9 +2034,9 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
|
|||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "stable name");
|
||||
strcpy(pSchema[cols].name, "stable_name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
|
@ -2098,11 +2100,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
|
|||
|
||||
char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
|
||||
int16_t len = strnlen(tableName, TSDB_DB_NAME_LEN);
|
||||
*(int16_t*) pWrite = len;
|
||||
pWrite += sizeof(int16_t); // todo refactor
|
||||
|
||||
strncpy(pWrite, tableName, len);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, TSDB_TABLE_NAME_LEN);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
|
@ -2119,9 +2117,13 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
|
|||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
|
||||
memset(tableName, 0, tListLen(tableName));
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
||||
mgmtExtractTableName(pTable->superTable->info.tableId, pWrite);
|
||||
mgmtExtractTableName(pTable->superTable->info.tableId, tableName);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef struct STSElem {
|
|||
} STSElem;
|
||||
|
||||
typedef struct STSCursor {
|
||||
int32_t vnodeIndex;
|
||||
int32_t vgroupIndex;
|
||||
int32_t blockIndex;
|
||||
int32_t tsIndex;
|
||||
uint32_t order;
|
||||
|
|
|
@ -482,7 +482,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
|
|||
}
|
||||
|
||||
STSCursor* pCur = &pTSBuf->cur;
|
||||
if (pCur->vnodeIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
|
||||
if (pCur->vgroupIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
|
||||
(pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
|
||||
int32_t i = 0;
|
||||
bool decomp = false;
|
||||
|
@ -517,7 +517,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
|
|||
|
||||
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
|
||||
|
||||
pCur->vnodeIndex = vnodeIndex;
|
||||
pCur->vgroupIndex = vnodeIndex;
|
||||
pCur->blockIndex = blockIndex;
|
||||
|
||||
pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
|
||||
|
@ -554,7 +554,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
|
|||
STSCursor* pCur = &pTSBuf->cur;
|
||||
|
||||
// get the first/last position according to traverse order
|
||||
if (pCur->vnodeIndex == -1) {
|
||||
if (pCur->vgroupIndex == -1) {
|
||||
if (pCur->order == TSDB_ORDER_ASC) {
|
||||
tsBufGetBlock(pTSBuf, 0, 0);
|
||||
|
||||
|
@ -569,9 +569,9 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
|
|||
assert(pTSBuf->numOfVnodes > 0);
|
||||
|
||||
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1;
|
||||
pCur->vnodeIndex = vnodeIndex;
|
||||
pCur->vgroupIndex = vnodeIndex;
|
||||
|
||||
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
|
||||
int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
|
||||
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
|
||||
int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
|
||||
|
||||
|
@ -594,14 +594,14 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
|
|||
|
||||
if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
|
||||
(pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
|
||||
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
|
||||
int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
|
||||
|
||||
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
|
||||
if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
|
||||
(pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
|
||||
if ((pCur->vnodeIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSDB_ORDER_ASC) ||
|
||||
(pCur->vnodeIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
|
||||
pCur->vnodeIndex = -1;
|
||||
if ((pCur->vgroupIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSDB_ORDER_ASC) ||
|
||||
(pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
|
||||
pCur->vgroupIndex = -1;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -610,11 +610,11 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
|
|||
}
|
||||
|
||||
int32_t blockIndex = pCur->order == TSDB_ORDER_ASC ? 0 : pBlockInfo->numOfBlocks - 1;
|
||||
tsBufGetBlock(pTSBuf, pCur->vnodeIndex + step, blockIndex);
|
||||
tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
|
||||
break;
|
||||
|
||||
} else {
|
||||
tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex + step);
|
||||
tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
|
@ -631,7 +631,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
|
|||
return;
|
||||
}
|
||||
|
||||
pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vnodeIndex = -1, .order = pTSBuf->cur.order};
|
||||
pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order};
|
||||
}
|
||||
|
||||
STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
||||
|
@ -642,13 +642,13 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
|||
}
|
||||
|
||||
STSCursor* pCur = &pTSBuf->cur;
|
||||
if (pCur != NULL && pCur->vnodeIndex < 0) {
|
||||
if (pCur != NULL && pCur->vgroupIndex < 0) {
|
||||
return elem1;
|
||||
}
|
||||
|
||||
STSBlock* pBlock = &pTSBuf->block;
|
||||
|
||||
elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
|
||||
elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
|
||||
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
|
||||
elem1.tag = pBlock->tag;
|
||||
|
||||
|
@ -804,7 +804,7 @@ STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) {
|
|||
return elem;
|
||||
}
|
||||
|
||||
pCur->vnodeIndex = j;
|
||||
pCur->vgroupIndex = j;
|
||||
pCur->blockIndex = blockIndex;
|
||||
tsBufGetBlock(pTSBuf, j, blockIndex);
|
||||
|
||||
|
@ -812,7 +812,7 @@ STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) {
|
|||
}
|
||||
|
||||
STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
|
||||
STSCursor c = {.vnodeIndex = -1};
|
||||
STSCursor c = {.vgroupIndex = -1};
|
||||
if (pTSBuf == NULL) {
|
||||
return c;
|
||||
}
|
||||
|
@ -825,9 +825,9 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
|
|||
return;
|
||||
}
|
||||
|
||||
// assert(pCur->vnodeIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
|
||||
if (pCur->vnodeIndex != -1) {
|
||||
tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex);
|
||||
// assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
|
||||
if (pCur->vgroupIndex != -1) {
|
||||
tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
|
||||
}
|
||||
|
||||
pTSBuf->cur = *pCur;
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "tscompression.h"
|
||||
#include "ttime.h"
|
||||
#include "tscUtil.h" // todo move the function to common module
|
||||
#include "tdataformat.h"
|
||||
|
||||
#define DEFAULT_INTERN_BUF_SIZE 16384L
|
||||
|
||||
|
@ -3517,7 +3518,7 @@ STableQueryInfo *createTableQueryInfoImpl(SQueryRuntimeEnv *pRuntimeEnv, STableI
|
|||
pTableQueryInfo->lastKey = win.skey;
|
||||
|
||||
pTableQueryInfo->id = tableId;
|
||||
pTableQueryInfo->cur.vnodeIndex = -1;
|
||||
pTableQueryInfo->cur.vgroupIndex = -1;
|
||||
|
||||
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT);
|
||||
return pTableQueryInfo;
|
||||
|
@ -3551,7 +3552,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQ
|
|||
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
||||
|
||||
pTableQueryInfo->cur.order = pTableQueryInfo->cur.order ^ 1u;
|
||||
pTableQueryInfo->cur.vnodeIndex = -1;
|
||||
pTableQueryInfo->cur.vgroupIndex = -1;
|
||||
}
|
||||
|
||||
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
|
||||
|
@ -3630,7 +3631,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *p
|
|||
|
||||
// both the master and supplement scan needs to set the correct ts comp start position
|
||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||
if (pTableQueryInfo->cur.vnodeIndex == -1) {
|
||||
if (pTableQueryInfo->cur.vgroupIndex == -1) {
|
||||
pTableQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
|
||||
|
||||
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pTableQueryInfo->tag);
|
||||
|
@ -4243,7 +4244,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
|||
|
||||
pRuntimeEnv->pQuery = pQuery;
|
||||
pRuntimeEnv->pTSBuf = param;
|
||||
pRuntimeEnv->cur.vnodeIndex = -1;
|
||||
pRuntimeEnv->cur.vgroupIndex = -1;
|
||||
pRuntimeEnv->stableQuery = isSTableQuery;
|
||||
|
||||
if (param != NULL) {
|
||||
|
@ -4461,7 +4462,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
|||
taosArrayDestroy(g1);
|
||||
|
||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||
if (pRuntimeEnv->cur.vnodeIndex == -1) {
|
||||
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
||||
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key;
|
||||
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag);
|
||||
|
||||
|
@ -6302,11 +6303,8 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
tsdbGetTableName(pQInfo->tsdb, &item->id, &data);
|
||||
|
||||
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + sizeof(int16_t));
|
||||
*(int16_t*) dst = strnlen(data, TSDB_TABLE_NAME_LEN);
|
||||
dst += sizeof(int16_t);
|
||||
|
||||
strncpy(dst, data, TSDB_TABLE_NAME_LEN);
|
||||
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(dst, data, TSDB_TABLE_NAME_LEN);
|
||||
tfree(data);
|
||||
|
||||
} else {// todo refactor, return the true length of binary|nchar data
|
||||
|
|
|
@ -35,8 +35,9 @@ enum {
|
|||
};
|
||||
|
||||
enum {
|
||||
TSDB_QUERY_TYPE_ALL_ROWS = 1,
|
||||
TSDB_QUERY_TYPE_LAST_ROW = 2,
|
||||
TSDB_QUERY_TYPE_ALL = 1,
|
||||
TSDB_QUERY_TYPE_LAST = 2,
|
||||
TSDB_QUERY_TYPE_EXTERNAL = 3,
|
||||
};
|
||||
|
||||
typedef struct SField {
|
||||
|
@ -58,7 +59,7 @@ typedef struct SDataBlockLoadInfo {
|
|||
} SDataBlockLoadInfo;
|
||||
|
||||
typedef struct SLoadCompBlockInfo {
|
||||
int32_t sid; /* meter sid */
|
||||
int32_t sid; /* table sid */
|
||||
int32_t fileId;
|
||||
int32_t fileListIndex;
|
||||
} SLoadCompBlockInfo;
|
||||
|
@ -92,7 +93,7 @@ typedef struct SBlockOrderSupporter {
|
|||
int32_t numOfTables;
|
||||
STableBlockInfo** pDataBlockInfo;
|
||||
int32_t* blockIndexArray;
|
||||
int32_t* numOfBlocksPerMeter;
|
||||
int32_t* numOfBlocksPerTable;
|
||||
} SBlockOrderSupporter;
|
||||
|
||||
typedef struct STsdbQueryHandle {
|
||||
|
@ -144,6 +145,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
|
|||
pQueryHandle->order = pCond->order;
|
||||
pQueryHandle->window = pCond->twindow;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
|
||||
|
||||
pQueryHandle->cur.fid = -1;
|
||||
|
@ -204,13 +206,23 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
|
|||
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList) {
|
||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
|
||||
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_LAST_ROW;
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||
|
||||
changeQueryHandleForLastrowQuery(pQueryHandle);
|
||||
return pQueryHandle;
|
||||
}
|
||||
|
||||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList) {
|
||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
|
||||
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
|
||||
pQueryHandle->order = TSDB_ORDER_ASC;
|
||||
|
||||
// changeQueryHandleForLastrowQuery(pQueryHandle);
|
||||
return pQueryHandle;
|
||||
}
|
||||
|
||||
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
|
||||
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
|
||||
|
@ -689,7 +701,7 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
|||
}
|
||||
|
||||
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
|
||||
tfree(pSupporter->numOfBlocksPerMeter);
|
||||
tfree(pSupporter->numOfBlocksPerTable);
|
||||
tfree(pSupporter->blockIndexArray);
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
|
@ -708,10 +720,10 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
|
|||
int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
|
||||
int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];
|
||||
|
||||
if (leftTableBlockIndex > pSupporter->numOfBlocksPerMeter[leftTableIndex]) {
|
||||
if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
|
||||
/* left block is empty */
|
||||
return 1;
|
||||
} else if (rightTableBlockIndex > pSupporter->numOfBlocksPerMeter[rightTableIndex]) {
|
||||
} else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
|
||||
/* right block is empty */
|
||||
return -1;
|
||||
}
|
||||
|
@ -743,11 +755,11 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
|
|||
|
||||
SBlockOrderSupporter sup = {0};
|
||||
sup.numOfTables = numOfTables;
|
||||
sup.numOfBlocksPerMeter = calloc(1, sizeof(int32_t) * numOfTables);
|
||||
sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
|
||||
sup.blockIndexArray = calloc(1, sizeof(int32_t) * numOfTables);
|
||||
sup.pDataBlockInfo = calloc(1, POINTER_BYTES * numOfTables);
|
||||
|
||||
if (sup.numOfBlocksPerMeter == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
|
||||
if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
|
||||
cleanBlockOrderSupporter(&sup, 0);
|
||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -761,7 +773,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
|
|||
}
|
||||
|
||||
SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
|
||||
sup.numOfBlocksPerMeter[numOfQualTables] = pTableCheck->numOfBlocks;
|
||||
sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
|
||||
|
||||
char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
|
||||
if (buf == NULL) {
|
||||
|
@ -779,7 +791,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
|
|||
|
||||
pBlockInfoEx->pTableCheckInfo = pTableCheck;
|
||||
// pBlockInfoEx->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index
|
||||
// pBlockInfoEx->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original meter
|
||||
// pBlockInfoEx->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table
|
||||
cnt++;
|
||||
}
|
||||
|
||||
|
@ -788,7 +800,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
|
|||
|
||||
uTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables);
|
||||
|
||||
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pMeterDataInfo[j]->numOfBlocks may be 0
|
||||
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
|
||||
sup.numOfTables = numOfQualTables;
|
||||
SLoserTreeInfo* pTree = NULL;
|
||||
|
||||
|
@ -808,8 +820,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
|
|||
pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfoEx[index];
|
||||
|
||||
// set data block index overflow, in order to disable the offset comparator
|
||||
if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerMeter[pos]) {
|
||||
sup.blockIndexArray[pos] = sup.numOfBlocksPerMeter[pos] + 1;
|
||||
if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
|
||||
sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
|
||||
}
|
||||
|
||||
tLoserTreeAdjust(pTree, pos + sup.numOfTables);
|
||||
|
@ -843,10 +855,14 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
|
|||
break;
|
||||
}
|
||||
|
||||
assert(numOfBlocks >= 0);
|
||||
uTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
|
||||
numOfTables, pQueryHandle->pFileGroup->fileId);
|
||||
|
||||
assert(numOfBlocks >= 0);
|
||||
if (numOfBlocks == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// todo return error code to query engine
|
||||
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
|
@ -966,17 +982,22 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
|
|||
// todo consider the query time window, current last_row does not apply the query time window
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
|
||||
TSKEY key = INT64_MIN;
|
||||
TSKEY key = 0;
|
||||
int32_t index = -1;
|
||||
|
||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||
if (pCheckInfo->pTableObj->lastKey > key) {
|
||||
if (pCheckInfo->pTableObj->lastKey > key) { //todo lastKey should not be 0 by default
|
||||
key = pCheckInfo->pTableObj->lastKey;
|
||||
index = i;
|
||||
}
|
||||
}
|
||||
|
||||
// todo, there are no data in all the tables. opt performance
|
||||
if (index == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// erase all other elements in array list, todo refactor
|
||||
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
|
|
Loading…
Reference in New Issue