[TD-1876]<fix>: fix bugs in arithmetic expression in super table join query processing.
This commit is contained in:
parent
f7d0f89e5e
commit
0b83e6746e
|
@ -65,7 +65,6 @@ static bool validateTagParams(tFieldList* pTagsList, tFieldList* pFieldList, SSq
|
||||||
static int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStrToken* tableName, int32_t* len);
|
static int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStrToken* tableName, int32_t* len);
|
||||||
|
|
||||||
static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength);
|
static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength);
|
||||||
static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName);
|
|
||||||
|
|
||||||
static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult);
|
static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult);
|
||||||
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
|
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
|
||||||
|
@ -1625,12 +1624,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc,
|
static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc,
|
||||||
char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
|
tSQLExprItem* item, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
|
||||||
const char* msg1 = "not support column types";
|
const char* msg1 = "not support column types";
|
||||||
|
|
||||||
int16_t type = 0;
|
int16_t type = 0;
|
||||||
int16_t bytes = 0;
|
int16_t bytes = 0;
|
||||||
char columnName[TSDB_COL_NAME_LEN] = {0};
|
|
||||||
int32_t functionID = cvtFunc.execFuncId;
|
int32_t functionID = cvtFunc.execFuncId;
|
||||||
|
|
||||||
if (functionID == TSDB_FUNC_SPREAD) {
|
if (functionID == TSDB_FUNC_SPREAD) {
|
||||||
|
@ -1647,14 +1645,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
|
||||||
bytes = pSchema[pColIndex->columnIndex].bytes;
|
bytes = pSchema[pColIndex->columnIndex].bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (aliasName != NULL) {
|
|
||||||
tstrncpy(columnName, aliasName, sizeof(columnName));
|
|
||||||
} else {
|
|
||||||
getRevisedName(columnName, cvtFunc.originFuncId, sizeof(columnName) - 1, pSchema[pColIndex->columnIndex].name);
|
|
||||||
}
|
|
||||||
|
|
||||||
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false);
|
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false);
|
||||||
tstrncpy(pExpr->aliasName, columnName, sizeof(pExpr->aliasName));
|
if (item->aliasName != NULL) {
|
||||||
|
tstrncpy(pExpr->aliasName, item->aliasName, tListLen(pExpr->aliasName));
|
||||||
|
} else {
|
||||||
|
int32_t len = MIN(tListLen(pExpr->aliasName), item->pNode->token.n + 1);
|
||||||
|
tstrncpy(pExpr->aliasName, item->pNode->token.z, len);
|
||||||
|
}
|
||||||
|
|
||||||
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
|
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
|
||||||
pExpr->colInfo.flag |= TSDB_COL_NULL;
|
pExpr->colInfo.flag |= TSDB_COL_NULL;
|
||||||
|
@ -1674,7 +1671,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
|
||||||
// if it is not in the final result, do not add it
|
// if it is not in the final result, do not add it
|
||||||
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
|
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
|
||||||
if (finalResult) {
|
if (finalResult) {
|
||||||
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr);
|
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->aliasName, pExpr);
|
||||||
} else {
|
} else {
|
||||||
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]));
|
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]));
|
||||||
}
|
}
|
||||||
|
@ -1939,8 +1936,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
|
|
||||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||||
|
|
||||||
if (pParamElem->pNode->nSQLOptr == TK_ALL) {
|
if (pParamElem->pNode->nSQLOptr == TK_ALL) { // select table.*
|
||||||
// select table.*
|
|
||||||
SStrToken tmpToken = pParamElem->pNode->colInfo;
|
SStrToken tmpToken = pParamElem->pNode->colInfo;
|
||||||
|
|
||||||
if (getTableIndexByName(&tmpToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
if (getTableIndexByName(&tmpToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1952,7 +1948,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
|
|
||||||
for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) {
|
for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) {
|
||||||
index.columnIndex = j;
|
index.columnIndex = j;
|
||||||
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) {
|
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex++, &index, finalResult) != 0) {
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1970,7 +1966,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) {
|
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex + i, &index, finalResult) != 0) {
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2007,7 +2003,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
|
|
||||||
for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) {
|
for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) {
|
||||||
SColumnIndex index = {.tableIndex = j, .columnIndex = i};
|
SColumnIndex index = {.tableIndex = j, .columnIndex = i};
|
||||||
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) {
|
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex, &index, finalResult) != 0) {
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2240,10 +2236,6 @@ void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLengt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName) {
|
|
||||||
snprintf(resultFieldName, maxLen, "%s(%s)", aAggs[functionId].aName, columnName);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool isTablenameToken(SStrToken* token) {
|
static bool isTablenameToken(SStrToken* token) {
|
||||||
SStrToken tmpToken = *token;
|
SStrToken tmpToken = *token;
|
||||||
SStrToken tableToken = {0};
|
SStrToken tableToken = {0};
|
||||||
|
|
|
@ -878,37 +878,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
|
|
||||||
// compressed ts block
|
// compressed ts block
|
||||||
pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
|
pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
|
||||||
int32_t tsLen = 0;
|
|
||||||
int32_t numOfBlocks = 0;
|
|
||||||
|
|
||||||
if (pQueryInfo->tsBuf != NULL) {
|
if (pQueryInfo->tsBuf != NULL) {
|
||||||
int32_t vnodeId = htonl(pQueryMsg->head.vgId);
|
int32_t vnodeId = htonl(pQueryMsg->head.vgId);
|
||||||
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId);
|
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeId, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
|
||||||
assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
||||||
// todo refactor, extract method and put into tsBuf.c
|
|
||||||
if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
|
|
||||||
int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
|
|
||||||
tscError("%p: fseek failed: %s", pSql, tstrerror(code));
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
|
pMsg += pQueryMsg->tsLen;
|
||||||
if (s != pBlockInfo->compLen) {
|
|
||||||
int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
|
|
||||||
tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMsg += pBlockInfo->compLen;
|
|
||||||
tsLen = pBlockInfo->compLen;
|
|
||||||
numOfBlocks = pBlockInfo->numOfBlocks;
|
|
||||||
}
|
|
||||||
|
|
||||||
pQueryMsg->tsLen = htonl(tsLen);
|
|
||||||
pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
|
|
||||||
if (pQueryInfo->tsBuf != NULL) {
|
|
||||||
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
|
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
|
||||||
|
pQueryMsg->tsLen = htonl(pQueryMsg->tsLen);
|
||||||
|
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
|
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
|
||||||
|
|
|
@ -189,7 +189,6 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
||||||
|
|
||||||
STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf);
|
STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf);
|
||||||
if (el1.vnode < 0) { // no data exists, abort
|
if (el1.vnode < 0) { // no data exists, abort
|
||||||
completed = true;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,8 @@ int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf);
|
||||||
|
|
||||||
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId);
|
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId);
|
||||||
|
|
||||||
|
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -6373,12 +6373,13 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
STSBuf *pTSBuf = NULL;
|
STSBuf *pTSBuf = NULL;
|
||||||
if (pQueryMsg->tsLen > 0) { // open new file to save the result
|
if (pQueryMsg->tsLen > 0) { // open new file to save the result
|
||||||
char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset;
|
char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset;
|
||||||
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder, vgId);
|
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder, vgId);
|
||||||
|
|
||||||
tsBufResetPos(pTSBuf);
|
tsBufResetPos(pTSBuf);
|
||||||
bool ret = tsBufNextPos(pTSBuf);
|
bool ret = tsBufNextPos(pTSBuf);
|
||||||
|
|
||||||
UNUSED(ret);
|
UNUSED(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -219,6 +219,15 @@ static void shrinkBuffer(STSList* ptsData) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getTagAreaLength(tVariant* pa) {
|
||||||
|
int32_t t = sizeof(pa->nLen) * 2 + sizeof(pa->nType);
|
||||||
|
if (pa->nType != TSDB_DATA_TYPE_NULL) {
|
||||||
|
t += pa->nLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
static void writeDataToDisk(STSBuf* pTSBuf) {
|
static void writeDataToDisk(STSBuf* pTSBuf) {
|
||||||
if (pTSBuf->tsData.len == 0) {
|
if (pTSBuf->tsData.len == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -244,12 +253,17 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
|
||||||
*/
|
*/
|
||||||
int32_t metaLen = 0;
|
int32_t metaLen = 0;
|
||||||
metaLen += (int32_t)fwrite(&pBlock->tag.nType, 1, sizeof(pBlock->tag.nType), pTSBuf->f);
|
metaLen += (int32_t)fwrite(&pBlock->tag.nType, 1, sizeof(pBlock->tag.nType), pTSBuf->f);
|
||||||
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
|
|
||||||
|
|
||||||
|
int32_t trueLen = pBlock->tag.nLen;
|
||||||
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
|
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
|
||||||
metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f);
|
metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f);
|
||||||
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
|
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
|
||||||
metaLen += (int32_t)fwrite(&pBlock->tag.i64Key, 1, sizeof(int64_t), pTSBuf->f);
|
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
|
||||||
|
metaLen += (int32_t)fwrite(&pBlock->tag.i64Key, 1, (size_t) pBlock->tag.nLen, pTSBuf->f);
|
||||||
|
} else {
|
||||||
|
trueLen = 0;
|
||||||
|
metaLen += (int32_t)fwrite(&trueLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
|
||||||
}
|
}
|
||||||
|
|
||||||
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
|
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
|
||||||
|
@ -257,6 +271,9 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
|
||||||
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
|
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
|
||||||
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
|
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
|
||||||
|
|
||||||
|
metaLen += fwrite(&trueLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
|
||||||
|
assert(metaLen == getTagAreaLength(&pBlock->tag));
|
||||||
|
|
||||||
int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
|
int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
|
||||||
pTSBuf->fileSize += blockSize;
|
pTSBuf->fileSize += blockSize;
|
||||||
|
|
||||||
|
@ -291,17 +308,22 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
|
||||||
pBlock->padding = 0;
|
pBlock->padding = 0;
|
||||||
pBlock->numOfElem = 0;
|
pBlock->numOfElem = 0;
|
||||||
|
|
||||||
|
int32_t offset = -1;
|
||||||
|
|
||||||
if (order == TSDB_ORDER_DESC) {
|
if (order == TSDB_ORDER_DESC) {
|
||||||
/*
|
/*
|
||||||
* set the right position for the reversed traverse, the reversed traverse is started from
|
* set the right position for the reversed traverse, the reversed traverse is started from
|
||||||
* the end of each comp data block
|
* the end of each comp data block
|
||||||
*/
|
*/
|
||||||
int32_t ret = fseek(pTSBuf->f, -(int32_t)(sizeof(pBlock->padding)), SEEK_CUR);
|
int32_t prev = -(int32_t) (sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen));
|
||||||
size_t sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
|
int32_t ret = fseek(pTSBuf->f, prev, SEEK_CUR);
|
||||||
|
size_t sz = fread(&pBlock->padding, 1, sizeof(pBlock->padding), pTSBuf->f);
|
||||||
|
sz = fread(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
|
||||||
UNUSED(sz);
|
UNUSED(sz);
|
||||||
|
|
||||||
pBlock->compLen = pBlock->padding;
|
pBlock->compLen = pBlock->padding;
|
||||||
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
|
|
||||||
|
offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
|
||||||
ret = fseek(pTSBuf->f, -offset, SEEK_CUR);
|
ret = fseek(pTSBuf->f, -offset, SEEK_CUR);
|
||||||
UNUSED(ret);
|
UNUSED(ret);
|
||||||
}
|
}
|
||||||
|
@ -320,7 +342,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
|
||||||
|
|
||||||
sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
|
sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
|
||||||
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
|
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
|
||||||
sz = fread(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f);
|
sz = fread(&pBlock->tag.i64Key, (size_t) pBlock->tag.nLen, 1, pTSBuf->f);
|
||||||
}
|
}
|
||||||
|
|
||||||
sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
|
sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
|
||||||
|
@ -328,7 +350,6 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
|
||||||
sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
|
sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
|
||||||
UNUSED(sz);
|
UNUSED(sz);
|
||||||
sz = fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
|
sz = fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
|
||||||
UNUSED(sz);
|
|
||||||
|
|
||||||
if (decomp) {
|
if (decomp) {
|
||||||
pTSBuf->tsData.len =
|
pTSBuf->tsData.len =
|
||||||
|
@ -338,11 +359,20 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
|
||||||
|
|
||||||
// read the comp length at the length of comp block
|
// read the comp length at the length of comp block
|
||||||
sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
|
sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
|
||||||
|
assert(pBlock->padding == pBlock->compLen);
|
||||||
|
|
||||||
|
int32_t n = 0;
|
||||||
|
sz = fread(&n, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
|
||||||
|
if (pBlock->tag.nType == TSDB_DATA_TYPE_NULL) {
|
||||||
|
assert(n == 0);
|
||||||
|
} else {
|
||||||
|
assert(n == pBlock->tag.nLen);
|
||||||
|
}
|
||||||
|
|
||||||
UNUSED(sz);
|
UNUSED(sz);
|
||||||
|
|
||||||
// for backwards traverse, set the start position at the end of previous block
|
// for backwards traverse, set the start position at the end of previous block
|
||||||
if (order == TSDB_ORDER_DESC) {
|
if (order == TSDB_ORDER_DESC) {
|
||||||
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
|
|
||||||
int32_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
|
int32_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
|
||||||
UNUSED(r);
|
UNUSED(r);
|
||||||
}
|
}
|
||||||
|
@ -479,7 +509,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
|
||||||
if (pTSBuf->cur.order == TSDB_ORDER_DESC) {
|
if (pTSBuf->cur.order == TSDB_ORDER_DESC) {
|
||||||
STSBlock* pBlock = &pTSBuf->block;
|
STSBlock* pBlock = &pTSBuf->block;
|
||||||
int32_t compBlockSize =
|
int32_t compBlockSize =
|
||||||
pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
|
pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
|
||||||
int32_t ret = fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
|
int32_t ret = fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
|
||||||
UNUSED(ret);
|
UNUSED(ret);
|
||||||
}
|
}
|
||||||
|
@ -507,7 +537,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tVariantCompare(&pTSBuf->block.tag, tag) == 0) {
|
if (tVariantCompare(&pTSBuf->block.tag, tag) == 0) {
|
||||||
return i;
|
return (pTSBuf->cur.order == TSDB_ORDER_ASC)? i: (pBlockInfo->numOfBlocks - (i + 1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -994,3 +1024,28 @@ void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) {
|
||||||
(*vnodeId)[i] = pTSBuf->pData[i].info.vnode;
|
(*vnodeId)[i] = pTSBuf->pData[i].info.vnode;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks) {
|
||||||
|
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
|
||||||
|
|
||||||
|
*len = 0;
|
||||||
|
*numOfBlocks = 0;
|
||||||
|
|
||||||
|
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
|
||||||
|
int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f));
|
||||||
|
// qError("%p: fseek failed: %s", pSql, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t s = fread(buf, 1, pBlockInfo->compLen, pTSBuf->f);
|
||||||
|
if (s != pBlockInfo->compLen) {
|
||||||
|
int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f));
|
||||||
|
// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*len = pBlockInfo->compLen;
|
||||||
|
*numOfBlocks = pBlockInfo->numOfBlocks;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -472,13 +472,20 @@ void mergeIdenticalVnodeBufferTest() {
|
||||||
tsBufFlush(pTSBuf2);
|
tsBufFlush(pTSBuf2);
|
||||||
|
|
||||||
tsBufMerge(pTSBuf1, pTSBuf2);
|
tsBufMerge(pTSBuf1, pTSBuf2);
|
||||||
EXPECT_EQ(pTSBuf1->numOfVnodes, 1);
|
EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
|
||||||
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
|
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
|
||||||
|
|
||||||
tsBufResetPos(pTSBuf1);
|
tsBufResetPos(pTSBuf1);
|
||||||
|
|
||||||
|
int32_t count = 0;
|
||||||
while (tsBufNextPos(pTSBuf1)) {
|
while (tsBufNextPos(pTSBuf1)) {
|
||||||
STSElem elem = tsBufGetElem(pTSBuf1);
|
STSElem elem = tsBufGetElem(pTSBuf1);
|
||||||
EXPECT_EQ(elem.vnode, 12);
|
|
||||||
|
if (count++ < numOfTags * num) {
|
||||||
|
EXPECT_EQ(elem.vnode, 12);
|
||||||
|
} else {
|
||||||
|
EXPECT_EQ(elem.vnode, 77);
|
||||||
|
}
|
||||||
|
|
||||||
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
|
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
|
||||||
}
|
}
|
||||||
|
|
|
@ -316,6 +316,8 @@ if $data03 != 0 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt1.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
|
select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt1.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
|
||||||
|
select count(join_mt0.c1), first(join_mt0.c1)/count(*), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
|
||||||
select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt0.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
|
select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt0.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
|
||||||
|
select last(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue