Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/submit_req
This commit is contained in:
commit
c3bb58bf27
|
@ -270,10 +270,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
|
||||||
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
|
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
|
||||||
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
|
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
|
||||||
"STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
|
"STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
|
||||||
dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
|
dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile,
|
||||||
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
|
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
|
||||||
pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel, pCfg->numOfVgroups,
|
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel,
|
||||||
1 == pCfg->numOfStables);
|
pCfg->numOfVgroups, 1 == pCfg->numOfStables);
|
||||||
|
|
||||||
if (retentions) {
|
if (retentions) {
|
||||||
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
|
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
|
||||||
|
@ -501,7 +501,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
|
||||||
if (TSDB_SUPER_TABLE == pCfg->tableType) {
|
if (TSDB_SUPER_TABLE == pCfg->tableType) {
|
||||||
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE STABLE `%s` (", tbName);
|
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE STABLE `%s` (", tbName);
|
||||||
|
@ -691,9 +691,15 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
|
||||||
|
|
||||||
SNode* pProj = NULL;
|
SNode* pProj = NULL;
|
||||||
FOREACH(pProj, pProjects) {
|
FOREACH(pProj, pProjects) {
|
||||||
|
SExprNode* pExpr = (SExprNode*)pProj;
|
||||||
SColumnInfoData infoData = {0};
|
SColumnInfoData infoData = {0};
|
||||||
infoData.info.type = ((SExprNode*)pProj)->resType.type;
|
if (TSDB_DATA_TYPE_NULL == pExpr->resType.type) {
|
||||||
infoData.info.bytes = ((SExprNode*)pProj)->resType.bytes;
|
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
|
infoData.info.bytes = 0;
|
||||||
|
} else {
|
||||||
|
infoData.info.type = pExpr->resType.type;
|
||||||
|
infoData.info.bytes = pExpr->resType.bytes;
|
||||||
|
}
|
||||||
blockDataAppendColInfo(pBlock, &infoData);
|
blockDataAppendColInfo(pBlock, &infoData);
|
||||||
}
|
}
|
||||||
*pOutput = pBlock;
|
*pOutput = pBlock;
|
||||||
|
|
|
@ -1282,7 +1282,9 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
||||||
blockDataCleanup(pInfo->pSrcBlock);
|
blockDataCleanup(pInfo->pSrcBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void buildDeleteRange(TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
|
static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
|
||||||
|
SStreamState* pState = pOp->pTaskInfo->streamInfo.pState;
|
||||||
|
|
||||||
SSDataBlock* pBlock = delRes;
|
SSDataBlock* pBlock = delRes;
|
||||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
@ -1290,25 +1292,42 @@ static void buildDeleteRange(TSKEY start, TSKEY end, uint64_t groupId, SSDataBlo
|
||||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||||
colDataAppend(pStartCol, pBlock->info.rows, (const char*)&start, false);
|
colDataAppend(pStartCol, pBlock->info.rows, (const char*)&start, false);
|
||||||
colDataAppend(pEndCol, pBlock->info.rows, (const char*)&end, false);
|
colDataAppend(pEndCol, pBlock->info.rows, (const char*)&end, false);
|
||||||
colDataAppendNULL(pUidCol, pBlock->info.rows);
|
colDataAppendNULL(pUidCol, pBlock->info.rows);
|
||||||
colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
|
colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
|
||||||
colDataAppendNULL(pCalStartCol, pBlock->info.rows);
|
colDataAppendNULL(pCalStartCol, pBlock->info.rows);
|
||||||
colDataAppendNULL(pCalEndCol, pBlock->info.rows);
|
colDataAppendNULL(pCalEndCol, pBlock->info.rows);
|
||||||
|
|
||||||
|
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||||
|
|
||||||
|
void* tbname = NULL;
|
||||||
|
streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname);
|
||||||
|
if (tbname == NULL) {
|
||||||
|
colDataAppendNULL(pTableCol, pBlock->info.rows);
|
||||||
|
} else {
|
||||||
|
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
|
||||||
|
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
|
||||||
|
}
|
||||||
|
tdbFree(tbname);
|
||||||
|
|
||||||
pBlock->info.rows++;
|
pBlock->info.rows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void buildDeleteResult(SStreamFillSupporter* pFillSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
static void buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
||||||
SSDataBlock* delRes) {
|
SSDataBlock* delRes) {
|
||||||
|
SStreamFillOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStreamFillSupporter* pFillSup = pInfo->pFillSup;
|
||||||
if (hasPrevWindow(pFillSup)) {
|
if (hasPrevWindow(pFillSup)) {
|
||||||
TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
|
TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
|
||||||
buildDeleteRange(start, endTs, groupId, delRes);
|
buildDeleteRange(pOperator, start, endTs, groupId, delRes);
|
||||||
} else if (hasNextWindow(pFillSup)) {
|
} else if (hasNextWindow(pFillSup)) {
|
||||||
TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
|
TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
|
||||||
buildDeleteRange(startTs, end, groupId, delRes);
|
buildDeleteRange(pOperator, startTs, end, groupId, delRes);
|
||||||
} else {
|
} else {
|
||||||
buildDeleteRange(startTs, endTs, groupId, delRes);
|
buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1319,7 +1338,7 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE
|
||||||
SWinKey key = {.ts = startTs, .groupId = groupId};
|
SWinKey key = {.ts = startTs, .groupId = groupId};
|
||||||
if (!pInfo->pFillInfo->needFill) {
|
if (!pInfo->pFillInfo->needFill) {
|
||||||
streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
|
streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
|
||||||
buildDeleteResult(pInfo->pFillSup, startTs, endTs, groupId, pInfo->pDelRes);
|
buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
|
||||||
} else {
|
} else {
|
||||||
STimeRange tw = {
|
STimeRange tw = {
|
||||||
.skey = startTs,
|
.skey = startTs,
|
||||||
|
@ -1715,9 +1734,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->srcRowIndex = 0;
|
pInfo->srcRowIndex = 0;
|
||||||
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
||||||
pOperator->fpSet =
|
pTaskInfo);
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -654,7 +654,7 @@ cgroup_name(A) ::= NK_ID(B).
|
||||||
|
|
||||||
/************************************************ expression **********************************************************/
|
/************************************************ expression **********************************************************/
|
||||||
expr_or_subquery(A) ::= expression(B). { A = B; }
|
expr_or_subquery(A) ::= expression(B). { A = B; }
|
||||||
expr_or_subquery(A) ::= subquery(B). { A = B; }
|
//expr_or_subquery(A) ::= subquery(B). { A = createTempTableNode(pCxt, releaseRawExprNode(pCxt, B), NULL); }
|
||||||
|
|
||||||
expression(A) ::= literal(B). { A = B; }
|
expression(A) ::= literal(B). { A = B; }
|
||||||
expression(A) ::= pseudo_column(B). { A = B; }
|
expression(A) ::= pseudo_column(B). { A = B; }
|
||||||
|
|
|
@ -372,14 +372,42 @@ static bool isEmptyResultQuery(SNode* pStmt) {
|
||||||
return isEmptyResult;
|
return isEmptyResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void resetProjectNullTypeImpl(SNodeList* pProjects) {
|
||||||
|
SNode* pProj = NULL;
|
||||||
|
FOREACH(pProj, pProjects) {
|
||||||
|
SExprNode* pExpr = (SExprNode*)pProj;
|
||||||
|
if (TSDB_DATA_TYPE_NULL == pExpr->resType.type) {
|
||||||
|
pExpr->resType.type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
|
pExpr->resType.bytes = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void resetProjectNullType(SNode* pStmt) {
|
||||||
|
switch (nodeType(pStmt)) {
|
||||||
|
case QUERY_NODE_SELECT_STMT:
|
||||||
|
resetProjectNullTypeImpl(((SSelectStmt*)pStmt)->pProjectionList);
|
||||||
|
break;
|
||||||
|
case QUERY_NODE_SET_OPERATOR: {
|
||||||
|
resetProjectNullTypeImpl(((SSetOperator*)pStmt)->pProjectionList);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery) {
|
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery) {
|
||||||
SCalcConstContext cxt = {.pParseCxt = pParseCxt,
|
SCalcConstContext cxt = {.pParseCxt = pParseCxt,
|
||||||
.msgBuf.buf = pParseCxt->pMsg,
|
.msgBuf.buf = pParseCxt->pMsg,
|
||||||
.msgBuf.len = pParseCxt->msgLen,
|
.msgBuf.len = pParseCxt->msgLen,
|
||||||
.code = TSDB_CODE_SUCCESS};
|
.code = TSDB_CODE_SUCCESS};
|
||||||
int32_t code = calcConstQuery(&cxt, pQuery->pRoot, false);
|
int32_t code = calcConstQuery(&cxt, pQuery->pRoot, false);
|
||||||
if (TSDB_CODE_SUCCESS == code && isEmptyResultQuery(pQuery->pRoot)) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;
|
resetProjectNullType(pQuery->pRoot);
|
||||||
|
if (isEmptyResultQuery(pQuery->pRoot)) {
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue