Merge branch '3.0' into feature/TD-14481-3.0

This commit is contained in:
Cary Xu 2022-05-30 20:28:51 +08:00
commit 77a3cb5ee8
10 changed files with 94 additions and 53 deletions

View File

@ -698,7 +698,7 @@ SELECT INTERP(field_name) FROM { tb_name | stb_name } WHERE ts='timestamp' [FILL
SELECT TAIL(field_name, k, offset_val) FROM {tb_name | stb_name} [WHERE clause];
```
**功能说明**:返回跳过最后 offset_value 个,然后取连续 k 个记录,不忽略 NULL 值。offset_val 可以不输入。此时返回最后的 k 个记录。当有 offset_val 输入的情况下,该函数功能等效于 `order by ts desc LIMIT k OFFSET offset_val`
**功能说明**:返回跳过最后 offset_val 个,然后取连续 k 个记录,不忽略 NULL 值。offset_val 可以不输入。此时返回最后的 k 个记录。当有 offset_val 输入的情况下,该函数功能等效于 `order by ts desc LIMIT k OFFSET offset_val`
**参数范围**k: [1,100] offset_val: [0,100]。

View File

@ -605,6 +605,10 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
* @param pCols
*/
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printf("%s:%d ts: %" PRIi64 " sver:%d maxCols:%" PRIi16 " nCols:%" PRIi16 ", nRows:%d\n", __func__, __LINE__,
TD_ROW_KEY(pRow), TD_ROW_SVER(pRow), pCols->maxCols, pCols->numOfCols, pCols->numOfRows);
#endif
if (TD_IS_TP_ROW(pRow)) {
return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge);
} else if (TD_IS_KV_ROW(pRow)) {

View File

@ -79,7 +79,8 @@ struct STsdb {
struct STable {
uint64_t tid;
uint64_t uid;
STSchema *pSchema;
STSchema *pSchema; // latest schema
STSchema *pCacheSchema; // cached cache
};
#define TABLE_TID(t) (t)->tid
@ -181,12 +182,15 @@ int tsdbUnlockRepo(STsdb *pTsdb);
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
int32_t version) {
if ((version != -1) && (schemaVersion(pTable->pSchema) != version)) {
taosMemoryFreeClear(pTable->pSchema);
pTable->pSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) {
return pTable->pSchema;
}
return pTable->pSchema;
if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) {
taosMemoryFreeClear(pTable->pCacheSchema);
pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
}
return pTable->pCacheSchema;
}
// tsdbMemTable.h

View File

@ -300,7 +300,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
pSW = metaGetTableSchema(pMeta, quid, sver, 0);
if (!pSW) return NULL;
tdInitTSchemaBuilder(&sb, sver);
tdInitTSchemaBuilder(&sb, pSW->version);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);

View File

@ -441,7 +441,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 1);
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED;

View File

@ -466,7 +466,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
pTbData = (STbData *)pNode->pData;
pCommitIter = pCommith->iters + i;
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1); // TODO: schema version
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1);
if (pTSchema) {
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
@ -475,7 +475,8 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
pCommitIter->pTable->uid = pTbData->uid;
pCommitIter->pTable->tid = pTbData->uid;
pCommitIter->pTable->pSchema = pTSchema; // metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0);
pCommitIter->pTable->pSchema = pTSchema;
pCommitIter->pTable->pCacheSchema = NULL;
}
}
tSkipListDestroyIter(pSlIter);
@ -490,6 +491,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
tSkipListDestroyIter(pCommith->iters[i].pIter);
if (pCommith->iters[i].pTable) {
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
tdFreeSchema(pCommith->iters[i].pTable->pCacheSchema);
taosMemoryFreeClear(pCommith->iters[i].pTable);
}
}
@ -914,7 +916,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
while (bidx < nBlocks) {
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
// Set commit table
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 1); // TODO: schema version
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, -1); // TODO: schema version
if (!pTSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;

View File

@ -3384,7 +3384,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
// todo add more information about exchange operation
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) {
*order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
@ -3499,14 +3499,15 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
}
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
if(result == NULL || length == NULL){
if (result == NULL || length == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize = sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize =
sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
*result = (char*)taosMemoryCalloc(1, totalSize);
if (*result == NULL) {
@ -3568,11 +3569,11 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
}
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
if(result == NULL){
if (result == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t length = *(int32_t*)(result);
@ -4512,8 +4513,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = {.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
@ -4527,9 +4528,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else {
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle,
tableIdList, pTableScanNode, pTaskInfo, &twSup, pTableScanNode->tsColId);
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode,
pTaskInfo, &twSup, pTableScanNode->tsColId);
taosArrayDestroy(tableIdList);
return pOperator;
@ -4996,25 +4997,25 @@ _error:
return NULL;
}
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
int32_t code = TDB_CODE_SUCCESS;
char *pCurrent = NULL;
char* pCurrent = NULL;
int32_t currLength = 0;
if(ops->fpSet.encodeResultRow){
if(result == NULL || length == NULL){
if (ops->fpSet.encodeResultRow) {
if (result == NULL || length == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
if(code != TDB_CODE_SUCCESS){
if(*result != NULL){
if (code != TDB_CODE_SUCCESS) {
if (*result != NULL) {
taosMemoryFree(*result);
*result = NULL;
}
return code;
}
if(*result == NULL){
if (*result == NULL) {
*result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
if (*result == NULL) {
taosMemoryFree(pCurrent);
@ -5022,9 +5023,9 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){
}
memcpy(*result + sizeof(int32_t), pCurrent, currLength);
*(int32_t*)(*result) = currLength + sizeof(int32_t);
}else{
} else {
int32_t sizePre = *(int32_t*)(*result);
char* tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
char* tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
if (tmp == NULL) {
taosMemoryFree(pCurrent);
taosMemoryFree(*result);
@ -5041,33 +5042,33 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length);
if(code != TDB_CODE_SUCCESS){
if (code != TDB_CODE_SUCCESS) {
return code;
}
}
return TDB_CODE_SUCCESS;
}
int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){
int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length) {
int32_t code = TDB_CODE_SUCCESS;
if(ops->fpSet.decodeResultRow){
if(result == NULL){
if (ops->fpSet.decodeResultRow) {
if (result == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
ASSERT(length == *(int32_t*)result);
char* data = result + sizeof(int32_t);
code = ops->fpSet.decodeResultRow(ops, data);
if(code != TDB_CODE_SUCCESS){
if (code != TDB_CODE_SUCCESS) {
return code;
}
int32_t totalLength = *(int32_t*)result;
int32_t dataLength = *(int32_t*)data;
if(totalLength == dataLength + sizeof(int32_t)) { // the last data
if (totalLength == dataLength + sizeof(int32_t)) { // the last data
result = NULL;
length = 0;
}else{
} else {
result += dataLength;
*(int32_t*)(result) = totalLength - dataLength;
length = totalLength - dataLength;
@ -5076,7 +5077,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = decodeOperator(ops->pDownstream[i], result, length);
if(code != TDB_CODE_SUCCESS){
if (code != TDB_CODE_SUCCESS) {
return code;
}
}

View File

@ -752,18 +752,30 @@ static bool isMultiResFunc(SNode* pNode) {
return (QUERY_NODE_COLUMN == nodeType(pParam) ? 0 == strcmp(((SColumnNode*)pParam)->colName, "*") : false);
}
static EDealRes translateUnaryOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
static int32_t rewriteNegativeOperator(SNode** pOp) {
SNode* pRes = NULL;
int32_t code = scalarCalculateConstants(*pOp, &pRes);
if (TSDB_CODE_SUCCESS == code) {
*pOp = pRes;
}
return code;
}
static EDealRes translateUnaryOperator(STranslateContext* pCxt, SOperatorNode** pOpRef) {
SOperatorNode* pOp = *pOpRef;
if (OP_TYPE_MINUS == pOp->opType) {
if (!IS_MATHABLE_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
}
pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
pCxt->errCode = rewriteNegativeOperator((SNode**)pOpRef);
} else {
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
}
return DEAL_RES_CONTINUE;
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
}
static EDealRes translateArithmeticOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
@ -824,7 +836,9 @@ static EDealRes translateJsonOperator(STranslateContext* pCxt, SOperatorNode* pO
return DEAL_RES_CONTINUE;
}
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode** pOpRef) {
SOperatorNode* pOp = *pOpRef;
if (isMultiResFunc(pOp->pLeft)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
}
@ -833,7 +847,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
}
if (nodesIsUnaryOp(pOp)) {
return translateUnaryOperator(pCxt, pOp);
return translateUnaryOperator(pCxt, pOpRef);
} else if (nodesIsArithmeticOp(pOp)) {
return translateArithmeticOperator(pCxt, pOp);
} else if (nodesIsComparisonOp(pOp)) {
@ -992,7 +1006,7 @@ static EDealRes doTranslateExpr(SNode** pNode, void* pContext) {
case QUERY_NODE_VALUE:
return translateValue(pCxt, (SValueNode*)*pNode);
case QUERY_NODE_OPERATOR:
return translateOperator(pCxt, (SOperatorNode*)*pNode);
return translateOperator(pCxt, (SOperatorNode**)pNode);
case QUERY_NODE_FUNCTION:
return translateFunction(pCxt, (SFunctionNode*)*pNode);
case QUERY_NODE_LOGIC_CONDITION:
@ -1891,9 +1905,9 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartiti
return translateExprList(pCxt, pPartitionByList);
}
static int32_t translateWhere(STranslateContext* pCxt, SNode* pWhere) {
static int32_t translateWhere(STranslateContext* pCxt, SNode** pWhere) {
pCxt->currClause = SQL_CLAUSE_WHERE;
return translateExpr(pCxt, &pWhere);
return translateExpr(pCxt, pWhere);
}
static int32_t translateFrom(STranslateContext* pCxt, SSelectStmt* pSelect) {
@ -1964,7 +1978,7 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->pCurrStmt = pSelect;
int32_t code = translateFrom(pCxt, pSelect);
if (TSDB_CODE_SUCCESS == code) {
code = translateWhere(pCxt, pSelect->pWhere);
code = translateWhere(pCxt, &pSelect->pWhere);
}
if (TSDB_CODE_SUCCESS == code) {
code = translatePartitionBy(pCxt, pSelect->pPartitionByList);

View File

@ -44,6 +44,8 @@ TEST_F(ParserSelectTest, constant) {
"timestamp '2022-02-09 17:30:20', true, false, 15s FROM t1");
run("SELECT 123 + 45 FROM t1 WHERE 2 - 1");
run("SELECT * FROM t1 WHERE -2");
}
TEST_F(ParserSelectTest, expression) {
@ -76,6 +78,12 @@ TEST_F(ParserSelectTest, pseudoColumnSemanticCheck) {
run("SELECT TBNAME FROM (SELECT * FROM st1s1)", TSDB_CODE_PAR_INVALID_TBNAME, PARSER_STAGE_TRANSLATE);
}
TEST_F(ParserSelectTest, aggFunc) {
useDb("root", "test");
run("SELECT LEASTSQUARES(c1, -1, 1) FROM t1");
}
TEST_F(ParserSelectTest, multiResFunc) {
useDb("root", "test");

View File

@ -27,6 +27,14 @@ TEST_F(PlanSuperTableTest, pseudoCol) {
run("SELECT TBNAME, tag1, tag2 FROM st1");
}
TEST_F(PlanSuperTableTest, pseudoColOnChildTable) {
useDb("root", "test");
run("SELECT TBNAME FROM st1s1");
run("SELECT TBNAME, tag1, tag2 FROM st1s1");
}
TEST_F(PlanSuperTableTest, orderBy) {
useDb("root", "test");