commit
14bbac1520
|
@ -210,7 +210,8 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI
|
|||
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
|
||||
|
||||
SColumn* tscColumnClone(const SColumn* src);
|
||||
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
|
||||
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc);
|
||||
int32_t tscColumnExists(SArray* pColumnList, int32_t columnId, uint64_t uid);
|
||||
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
|
||||
void tscColumnListDestroy(SArray* pColList);
|
||||
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
|
||||
|
|
|
@ -1156,27 +1156,6 @@ static void insertBatchClean(STscStmt* pStmt) {
|
|||
|
||||
tfree(pCmd->insertParam.pTableNameList);
|
||||
|
||||
/*
|
||||
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
|
||||
|
||||
STableDataBlocks* pOneTableBlock = *p;
|
||||
|
||||
while (1) {
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||
|
||||
pOneTableBlock->size = sizeof(SSubmitBlk);
|
||||
|
||||
pBlocks->numOfRows = 0;
|
||||
|
||||
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
|
||||
if (p == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
pOneTableBlock = *p;
|
||||
}
|
||||
*/
|
||||
|
||||
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
|
||||
pCmd->insertParam.numOfTables = 0;
|
||||
|
||||
|
@ -1499,7 +1478,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
|||
pRes->numOfRows = 1;
|
||||
|
||||
strtolower(pSql->sqlstr, sql);
|
||||
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
|
||||
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
|
||||
|
||||
if (tscIsInsertData(pSql->sqlstr)) {
|
||||
pStmt->isInsert = true;
|
||||
|
@ -1604,7 +1583,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
|
|||
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
|
||||
SHashObj* hashList = pCmd->insertParam.pTableBlockHashList;
|
||||
pCmd->insertParam.pTableBlockHashList = NULL;
|
||||
tscResetSqlCmd(pCmd, true);
|
||||
tscResetSqlCmd(pCmd, false);
|
||||
pCmd->insertParam.pTableBlockHashList = hashList;
|
||||
}
|
||||
|
||||
|
@ -1663,7 +1642,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
|
|||
} else {
|
||||
if (pStmt->multiTbInsert) {
|
||||
taosHashCleanup(pStmt->mtb.pTableHash);
|
||||
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true);
|
||||
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, false);
|
||||
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
|
||||
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
|
||||
taosArrayDestroy(pStmt->mtb.tags);
|
||||
|
|
|
@ -1699,7 +1699,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32
|
|||
|
||||
// arithmetic expression always return result in the format of double float
|
||||
pExprInfo->base.resBytes = sizeof(double);
|
||||
pExprInfo->base.interBytes = sizeof(double);
|
||||
pExprInfo->base.interBytes = 0;
|
||||
pExprInfo->base.resType = TSDB_DATA_TYPE_DOUBLE;
|
||||
|
||||
pExprInfo->base.functionId = TSDB_FUNC_ARITHM;
|
||||
|
@ -1934,14 +1934,14 @@ SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tab
|
|||
index.columnIndex = colIndex;
|
||||
}
|
||||
|
||||
return tscExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, colId, pSchema->bytes,
|
||||
return tscExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, colId, 0,
|
||||
(functionId == TSDB_FUNC_TAGPRJ));
|
||||
}
|
||||
|
||||
SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
|
||||
SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag, int16_t colId) {
|
||||
SExprInfo* pExpr = tscExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type,
|
||||
pColSchema->bytes, colId, pColSchema->bytes, TSDB_COL_IS_TAG(flag));
|
||||
pColSchema->bytes, colId, 0, TSDB_COL_IS_TAG(flag));
|
||||
tstrncpy(pExpr->base.aliasName, pColSchema->name, sizeof(pExpr->base.aliasName));
|
||||
tstrncpy(pExpr->base.token, pColSchema->name, sizeof(pExpr->base.token));
|
||||
|
||||
|
@ -2096,7 +2096,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
|
|||
type = pSchema->type;
|
||||
bytes = pSchema->bytes;
|
||||
}
|
||||
|
||||
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pCmd), bytes, false);
|
||||
tstrncpy(pExpr->base.aliasName, name, tListLen(pExpr->base.aliasName));
|
||||
|
||||
|
@ -2168,6 +2168,17 @@ static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
static UNUSED_FUNC void updateFunctionInterBuf(SQueryInfo* pQueryInfo, bool superTable) {
|
||||
size_t numOfExpr = tscNumOfExprs(pQueryInfo);
|
||||
for (int32_t i = 0; i < numOfExpr; ++i) {
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
||||
|
||||
int32_t param = (int32_t)pExpr->base.param[0].i64;
|
||||
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, pExpr->base.functionId, param, &pExpr->base.resType, &pExpr->base.resBytes,
|
||||
&pExpr->base.interBytes, 0, superTable);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult) {
|
||||
STableMetaInfo* pTableMetaInfo = NULL;
|
||||
int32_t functionId = pItem->pNode->functionId;
|
||||
|
@ -2277,10 +2288,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
case TSDB_FUNC_LEASTSQR: {
|
||||
// 1. valid the number of parameters
|
||||
int32_t numOfParams = (pItem->pNode->pParam == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->pParam);
|
||||
|
||||
// no parameters or more than one parameter for function
|
||||
if (pItem->pNode->pParam == NULL ||
|
||||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) ||
|
||||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) {
|
||||
/* no parameters or more than one parameter for function */
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
|
@ -2294,14 +2306,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
// functions can not be applied to tags
|
||||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta))) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
|
||||
// 2. check if sql function can be applied on this column data type
|
||||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
|
||||
|
||||
if (!IS_NUMERIC_TYPE(pSchema->type)) {
|
||||
|
@ -2330,11 +2343,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
|
||||
}
|
||||
|
||||
// functions can not be applied to tags
|
||||
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false);
|
||||
|
||||
if (functionId == TSDB_FUNC_LEASTSQR) { // set the leastsquares parameters
|
||||
|
@ -2363,9 +2371,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
}
|
||||
|
||||
if (info.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
tickPerSec /= 1000000;
|
||||
tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MICRO);
|
||||
} else if (info.precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tickPerSec /= 1000;
|
||||
tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI);
|
||||
}
|
||||
|
||||
if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) {
|
||||
|
@ -2598,7 +2606,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
// set the first column ts for top/bottom query
|
||||
SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pCmd),
|
||||
TSDB_KEYSIZE, false);
|
||||
0, false);
|
||||
tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName));
|
||||
|
||||
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
|
@ -3113,15 +3121,10 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
|
|||
return true;
|
||||
}
|
||||
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols != 1) {
|
||||
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
|
||||
if (pColIndex->colIndex != TSDB_TBNAME_COLUMN_INDEX) {
|
||||
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return true;
|
||||
} else {
|
||||
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
|
||||
if (pColIndex->colIndex != TSDB_TBNAME_COLUMN_INDEX) {
|
||||
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else if (tscIsSessionWindowQuery(pQueryInfo)) {
|
||||
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
|
@ -3675,7 +3678,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS
|
|||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid)) {
|
||||
if (tscColumnExists(pTableMetaInfo->tagColList, pTagSchema1->colId, pTableMetaInfo->pTableMeta->id.uid) < 0) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pTagSchema1);
|
||||
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
|
@ -3707,7 +3710,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS
|
|||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid)) {
|
||||
if (tscColumnExists(pTableMetaInfo->tagColList, pTagSchema2->colId, pTableMeta->id.uid) < 0) {
|
||||
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pTagSchema2);
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
|
@ -7905,6 +7908,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
return code;
|
||||
}
|
||||
|
||||
// updateFunctionInterBuf(pQueryInfo, false);
|
||||
updateLastScanOrderIfNeeded(pQueryInfo);
|
||||
} else {
|
||||
pQueryInfo->command = TSDB_SQL_SELECT;
|
||||
|
@ -8033,6 +8037,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
|
||||
updateLastScanOrderIfNeeded(pQueryInfo);
|
||||
tscFieldInfoUpdateOffset(pQueryInfo);
|
||||
// updateFunctionInterBuf(pQueryInfo, isSTable);
|
||||
|
||||
if ((code = validateFillNode(pCmd, pQueryInfo, pSqlNode)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
|
|
@ -795,6 +795,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
|
|||
pSqlExpr->colBytes = htons(pExpr->colBytes);
|
||||
pSqlExpr->resType = htons(pExpr->resType);
|
||||
pSqlExpr->resBytes = htons(pExpr->resBytes);
|
||||
pSqlExpr->interBytes = htonl(pExpr->interBytes);
|
||||
pSqlExpr->functionId = htons(pExpr->functionId);
|
||||
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
|
||||
pSqlExpr->resColId = htons(pExpr->resColId);
|
||||
|
|
|
@ -103,13 +103,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
|
|||
|
||||
pthread_mutex_lock(&subState->mutex);
|
||||
|
||||
// bool done = allSubqueryDone(pParentSql);
|
||||
// if (done) {
|
||||
// tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx);
|
||||
// pthread_mutex_unlock(&subState->mutex);
|
||||
// return false;
|
||||
// }
|
||||
|
||||
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx);
|
||||
subState->states[idx] = 1;
|
||||
|
||||
|
@ -2389,8 +2382,14 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
SColumn *pCol = taosArrayGetP(pColList, i);
|
||||
|
||||
if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
|
||||
SColumn *p = tscColumnClone(pCol);
|
||||
taosArrayPush(pNewQueryInfo->colList, &p);
|
||||
int32_t index1 = tscColumnExists(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid);
|
||||
if (index1 >= 0) {
|
||||
SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1);
|
||||
tscColumnCopy(x, pCol);
|
||||
} else {
|
||||
SColumn *p = tscColumnClone(pCol);
|
||||
taosArrayPush(pNewQueryInfo->colList, &p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1332,7 +1332,7 @@ void tscFreeSubobj(SSqlObj* pSql) {
|
|||
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
|
||||
|
||||
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||
tscDebug("0x%"PRIx64" free sub SqlObj:%p, index:%d", pSql->self, pSql->pSubs[i], i);
|
||||
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
|
||||
taos_free_result(pSql->pSubs[i]);
|
||||
pSql->pSubs[i] = NULL;
|
||||
}
|
||||
|
@ -1784,7 +1784,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
tscSortRemoveDataBlockDupRows(pOneTableBlock);
|
||||
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
||||
|
||||
tscDebug("0x%"PRIx64" name:%s, name:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName),
|
||||
tscDebug("0x%"PRIx64" name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName),
|
||||
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
|
||||
|
||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
@ -2270,18 +2270,14 @@ int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) {
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
if (columnIndex < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
int32_t tscColumnExists(SArray* pColumnList, int32_t columnId, uint64_t uid) {
|
||||
size_t numOfCols = taosArrayGetSize(pColumnList);
|
||||
|
||||
int32_t i = 0;
|
||||
while (i < numOfCols) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
if ((pCol->columnIndex != columnIndex) || (pCol->tableUid != uid)) {
|
||||
if ((pCol->info.colId != columnId) || (pCol->tableUid != uid)) {
|
||||
++i;
|
||||
continue;
|
||||
} else {
|
||||
|
@ -2290,10 +2286,10 @@ bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) {
|
|||
}
|
||||
|
||||
if (i >= numOfCols || numOfCols == 0) {
|
||||
return false;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return true;
|
||||
return i;
|
||||
}
|
||||
|
||||
void tscExprAssign(SExprInfo* dst, const SExprInfo* src) {
|
||||
|
@ -2379,13 +2375,7 @@ SColumn* tscColumnClone(const SColumn* src) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
dst->columnIndex = src->columnIndex;
|
||||
dst->tableUid = src->tableUid;
|
||||
dst->info.flist.numOfFilters = src->info.flist.numOfFilters;
|
||||
dst->info.flist.filterInfo = tFilterInfoDup(src->info.flist.filterInfo, src->info.flist.numOfFilters);
|
||||
dst->info.type = src->info.type;
|
||||
dst->info.colId = src->info.colId;
|
||||
dst->info.bytes = src->info.bytes;
|
||||
tscColumnCopy(dst, src);
|
||||
return dst;
|
||||
}
|
||||
|
||||
|
@ -2394,6 +2384,18 @@ static void tscColumnDestroy(SColumn* pCol) {
|
|||
free(pCol);
|
||||
}
|
||||
|
||||
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc) {
|
||||
destroyFilterInfo(&pDest->info.flist);
|
||||
|
||||
pDest->columnIndex = pSrc->columnIndex;
|
||||
pDest->tableUid = pSrc->tableUid;
|
||||
pDest->info.flist.numOfFilters = pSrc->info.flist.numOfFilters;
|
||||
pDest->info.flist.filterInfo = tFilterInfoDup(pSrc->info.flist.filterInfo, pSrc->info.flist.numOfFilters);
|
||||
pDest->info.type = pSrc->info.type;
|
||||
pDest->info.colId = pSrc->info.colId;
|
||||
pDest->info.bytes = pSrc->info.bytes;
|
||||
}
|
||||
|
||||
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid) {
|
||||
assert(src != NULL && dst != NULL);
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ extern const int32_t TYPE_BYTES[15];
|
|||
#define TSDB_TIME_PRECISION_MICRO_STR "us"
|
||||
#define TSDB_TIME_PRECISION_NANO_STR "ns"
|
||||
|
||||
#define TSDB_TICK_PER_SECOND(precision) ((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))
|
||||
#define TSDB_TICK_PER_SECOND(precision) ((int64_t)((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
|
||||
|
||||
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
|
||||
#define T_APPEND_MEMBER(dst, ptr, type, member) \
|
||||
|
|
|
@ -1740,16 +1740,22 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
|
||||
static int32_t doGetVgroupInfoLength(char* name) {
|
||||
SSTableObj *pTable = mnodeGetSuperTable(name);
|
||||
int32_t len = 0;
|
||||
if (pTable != NULL && pTable->vgHash != NULL) {
|
||||
len = (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
|
||||
}
|
||||
|
||||
mnodeDecTableRef(pTable);
|
||||
return len;
|
||||
}
|
||||
|
||||
static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
|
||||
int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg);
|
||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
||||
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
||||
if (pTable != NULL && pTable->vgHash != NULL) {
|
||||
contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
|
||||
}
|
||||
|
||||
mnodeDecTableRef(pTable);
|
||||
contLen += doGetVgroupInfoLength(stableName);
|
||||
}
|
||||
|
||||
return contLen;
|
||||
|
@ -1820,7 +1826,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
|||
int32_t numOfTable = htonl(pInfo->numOfTables);
|
||||
|
||||
// calculate the required space.
|
||||
int32_t contLen = calculateVgroupMsgLength(pInfo, numOfTable);
|
||||
int32_t contLen = getVgroupInfoLength(pInfo, numOfTable);
|
||||
SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
|
||||
if (pRsp == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
@ -2860,6 +2866,27 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray* pList, int32_t* totalMallocLen, int32_t numOfVgroupList) {
|
||||
int32_t len = 0;
|
||||
for (int32_t i = 0; i < numOfVgroupList; ++i) {
|
||||
char *name = taosArrayGetP(pList, i);
|
||||
len += doGetVgroupInfoLength(name);
|
||||
}
|
||||
|
||||
if (len + pMultiMeta->contLen > (*totalMallocLen)) {
|
||||
while (len + pMultiMeta->contLen > (*totalMallocLen)) {
|
||||
(*totalMallocLen) *= 2;
|
||||
}
|
||||
|
||||
pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return pMultiMeta;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
|
@ -2950,8 +2977,6 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||
|
||||
// add the additional super table names that needs the vgroup info
|
||||
for(;t < num; ++t) {
|
||||
taosArrayPush(pList, &nameList[t]);
|
||||
|
@ -2961,6 +2986,13 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList);
|
||||
pMultiMeta->numOfVgroup = htonl(numOfVgroupList);
|
||||
|
||||
pMultiMeta = ensureMsgBufferSpace(pMultiMeta, pList, &totalMallocLen, numOfVgroupList);
|
||||
if (pMultiMeta == NULL) {
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||
for(int32_t i = 0; i < numOfVgroupList; ++i) {
|
||||
char* name = taosArrayGetP(pList, i);
|
||||
|
||||
|
|
|
@ -204,7 +204,7 @@ typedef struct SAggFunctionInfo {
|
|||
bool (*init)(SQLFunctionCtx *pCtx); // setup the execute environment
|
||||
|
||||
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
|
||||
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
|
||||
// void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
|
||||
|
||||
// finalizer must be called after all xFunction has been executed to generated final result.
|
||||
void (*xFinalize)(SQLFunctionCtx *pCtx);
|
||||
|
|
|
@ -295,7 +295,7 @@ enum OPERATOR_TYPE_E {
|
|||
OP_MultiTableAggregate = 14,
|
||||
OP_MultiTableTimeInterval = 15,
|
||||
OP_DummyInput = 16, //TODO remove it after fully refactor.
|
||||
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
|
||||
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
|
||||
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
|
||||
OP_Filter = 19,
|
||||
OP_Distinct = 20,
|
||||
|
|
|
@ -470,7 +470,7 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
|
|||
//////////////////////// The SELECT statement /////////////////////////////////
|
||||
%type select {SSqlNode*}
|
||||
%destructor select {destroySqlNode($$);}
|
||||
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
|
||||
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) having_opt(N) orderby_opt(Z) slimit_opt(G) limit_opt(L). {
|
||||
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -161,7 +161,7 @@ static void setResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResul
|
|||
int32_t numOfCols, int32_t* rowCellInfoOffset);
|
||||
|
||||
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
|
||||
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx);
|
||||
|
||||
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex);
|
||||
|
||||
|
@ -309,7 +309,7 @@ static bool isProjQuery(SQueryAttr *pQueryAttr) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) {
|
||||
static bool hasNull(SColIndex* pColIndex, SDataStatis *pStatis) {
|
||||
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return false;
|
||||
}
|
||||
|
@ -708,12 +708,13 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
|
|||
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
|
||||
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) {
|
||||
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
bool hasPrev = pCtx[0].preAggVals.isSet;
|
||||
bool hasAggregates = pCtx[0].preAggVals.isSet;
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
pCtx[k].size = forwardStep;
|
||||
pCtx[k].size = forwardStep;
|
||||
pCtx[k].startTs = pWin->skey;
|
||||
|
||||
// keep it temprarily
|
||||
char* start = pCtx[k].pInput;
|
||||
|
||||
int32_t pos = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? offset : offset - (forwardStep - 1);
|
||||
|
@ -725,20 +726,18 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
|
|||
pCtx[k].ptsList = &tsCol[pos];
|
||||
}
|
||||
|
||||
int32_t functionId = pCtx[k].functionId;
|
||||
|
||||
// not a whole block involved in query processing, statistics data can not be used
|
||||
// NOTE: the original value of isSet have been changed here
|
||||
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
|
||||
pCtx[k].preAggVals.isSet = false;
|
||||
}
|
||||
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||
aAggs[functionId].xFunction(&pCtx[k]);
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
|
||||
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
|
||||
}
|
||||
|
||||
// restore it
|
||||
pCtx[k].preAggVals.isSet = hasPrev;
|
||||
pCtx[k].preAggVals.isSet = hasAggregates;
|
||||
pCtx[k].pInput = start;
|
||||
}
|
||||
}
|
||||
|
@ -847,9 +846,6 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in
|
|||
}
|
||||
}
|
||||
|
||||
// window start key interpolation
|
||||
|
||||
|
||||
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock,
|
||||
int32_t rowIndex) {
|
||||
if (pDataBlock == NULL) {
|
||||
|
@ -975,10 +971,9 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
|
|||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
|
||||
int32_t functionId = pCtx[k].functionId;
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
|
||||
pCtx[k].startTs = startTs;// this can be set during create the struct
|
||||
aAggs[functionId].xFunction(&pCtx[k]);
|
||||
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1287,6 +1282,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
|
|||
return;
|
||||
}
|
||||
|
||||
int64_t* tsList = NULL;
|
||||
SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||
if (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
tsList = (int64_t*) pFirstColData->pData;
|
||||
}
|
||||
|
||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||
|
||||
int32_t num = 0;
|
||||
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
|
||||
char* val = ((char*)pColInfoData->pData) + bytes * j;
|
||||
if (isNull(val, type)) {
|
||||
|
@ -1294,33 +1298,59 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
|
|||
}
|
||||
|
||||
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
||||
if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) {
|
||||
if (pInfo->prevData == NULL) {
|
||||
pInfo->prevData = malloc(bytes);
|
||||
}
|
||||
|
||||
if (pInfo->prevData == NULL) {
|
||||
pInfo->prevData = malloc(bytes);
|
||||
memcpy(pInfo->prevData, val, bytes);
|
||||
num++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
|
||||
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
int32_t len = varDataLen(val);
|
||||
if(len == varDataLen(pInfo->prevData) && memcmp(varDataVal(pInfo->prevData), varDataVal(val), len) == 0) {
|
||||
num++;
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t ret =
|
||||
setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
} else {
|
||||
if (memcmp(pInfo->prevData, val, bytes) == 0) {
|
||||
num++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// todo opt perf
|
||||
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
|
||||
pInfo->binfo.pCtx[k].size = 1;
|
||||
int32_t functionId = pInfo->binfo.pCtx[k].functionId;
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) {
|
||||
aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j);
|
||||
}
|
||||
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
|
||||
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData,
|
||||
bytes);
|
||||
}
|
||||
|
||||
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes,
|
||||
item->groupIndex);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
|
||||
|
||||
num = 1;
|
||||
memcpy(pInfo->prevData, val, bytes);
|
||||
}
|
||||
|
||||
if (num > 0) {
|
||||
char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num);
|
||||
memcpy(pInfo->prevData, val, bytes);
|
||||
|
||||
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
|
||||
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val,
|
||||
bytes);
|
||||
}
|
||||
|
||||
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes,
|
||||
item->groupIndex);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1394,9 +1424,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|||
}
|
||||
|
||||
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
||||
int64_t v = -1;
|
||||
GET_TYPED_DATA(v, int64_t, type, pData);
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
if (pResultRow->key == NULL) {
|
||||
pResultRow->key = malloc(varDataTLen(pData));
|
||||
varDataCopy(pResultRow->key, pData);
|
||||
|
@ -1404,6 +1432,9 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
|||
assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
|
||||
}
|
||||
} else {
|
||||
int64_t v = -1;
|
||||
GET_TYPED_DATA(v, int64_t, type, pData);
|
||||
|
||||
pResultRow->win.skey = v;
|
||||
pResultRow->win.ekey = v;
|
||||
}
|
||||
|
@ -1419,7 +1450,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic
|
|||
// not assign result buffer yet, add new result buffer, TODO remove it
|
||||
char* d = pData;
|
||||
int16_t len = bytes;
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
d = varDataVal(pData);
|
||||
len = varDataLen(pData);
|
||||
}
|
||||
|
@ -1461,11 +1492,12 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD
|
|||
return -1;
|
||||
}
|
||||
|
||||
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) {
|
||||
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) {
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
// in case of timestamp column, always generated results.
|
||||
int32_t functionId = pCtx->functionId;
|
||||
if (functionId == TSDB_FUNC_TS) {
|
||||
return true;
|
||||
}
|
||||
|
@ -1505,7 +1537,7 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde
|
|||
pCtx->preAggVals.isSet = false;
|
||||
}
|
||||
|
||||
pCtx->hasNull = hasNullRv(pColIndex, pStatis);
|
||||
pCtx->hasNull = hasNull(pColIndex, pStatis);
|
||||
|
||||
// set the statistics data for primary time stamp column
|
||||
if (pCtx->functionId == TSDB_FUNC_SPREAD && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
|
@ -3478,6 +3510,7 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag,
|
|||
return 0;
|
||||
}
|
||||
|
||||
// TODO refactor: this funciton should be merged with setparamForStableStddevColumnData function.
|
||||
void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExprInfo) {
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
|
@ -4683,8 +4716,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
|||
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
|
||||
|
||||
pInfo->resultRowFactor =
|
||||
(int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery,
|
||||
false));
|
||||
(int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
|
||||
|
||||
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
|
||||
|
||||
|
@ -5256,6 +5288,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
|
||||
pSDataBlock->info.rows, pOperator->numOfOutput);
|
||||
}
|
||||
|
||||
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -6268,7 +6301,7 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pExpr, int32_t numOfOutput,
|
||||
static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pExpr, int32_t numOfOutput,
|
||||
SColumnInfo* pTagCols, void* pMsg) {
|
||||
int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags;
|
||||
if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) {
|
||||
|
@ -6453,6 +6486,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
|
|||
|
||||
pExprMsg->resType = htons(pExprMsg->resType);
|
||||
pExprMsg->resBytes = htons(pExprMsg->resBytes);
|
||||
pExprMsg->interBytes = htonl(pExprMsg->interBytes);
|
||||
|
||||
pExprMsg->functionId = htons(pExprMsg->functionId);
|
||||
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
|
||||
|
@ -6660,41 +6694,41 @@ _cleanup:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) {
|
||||
if (filterNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*dst = calloc(filterNum, sizeof(*src));
|
||||
if (*dst == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(*dst, src, sizeof(*src) * filterNum);
|
||||
|
||||
for (int32_t i = 0; i < filterNum; i++) {
|
||||
if ((*dst)[i].filterstr && dst[i]->len > 0) {
|
||||
void *pz = calloc(1, (size_t)(*dst)[i].len + 1);
|
||||
|
||||
if (pz == NULL) {
|
||||
if (i == 0) {
|
||||
free(*dst);
|
||||
} else {
|
||||
freeColumnFilterInfo(*dst, i);
|
||||
}
|
||||
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(pz, (void *)src->pz, (size_t)src->len + 1);
|
||||
|
||||
(*dst)[i].pz = (int64_t)pz;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) {
|
||||
if (filterNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*dst = calloc(filterNum, sizeof(*src));
|
||||
if (*dst == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(*dst, src, sizeof(*src) * filterNum);
|
||||
|
||||
for (int32_t i = 0; i < filterNum; i++) {
|
||||
if ((*dst)[i].filterstr && dst[i]->len > 0) {
|
||||
void *pz = calloc(1, (size_t)(*dst)[i].len + 1);
|
||||
|
||||
if (pz == NULL) {
|
||||
if (i == 0) {
|
||||
free(*dst);
|
||||
} else {
|
||||
freeColumnFilterInfo(*dst, i);
|
||||
}
|
||||
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(pz, (void *)src->pz, (size_t)src->len + 1);
|
||||
|
||||
(*dst)[i].pz = (int64_t)pz;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildArithmeticExprFromMsg(SExprInfo *pExprInfo, void *pQueryMsg) {
|
||||
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
|
||||
|
||||
|
@ -6753,8 +6787,8 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
|
|||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pExprs[i].base = *pExprMsg[i];
|
||||
memset(pExprs[i].base.param, 0, sizeof(tVariant) * tListLen(pExprs[i].base.param));
|
||||
|
||||
memset(pExprs[i].base.param, 0, sizeof(tVariant) * tListLen(pExprs[i].base.param));
|
||||
for (int32_t j = 0; j < pExprMsg[i]->numOfParams; ++j) {
|
||||
tVariantAssign(&pExprs[i].base.param[j], &pExprMsg[i]->param[j]);
|
||||
}
|
||||
|
@ -6829,6 +6863,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
|
|||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
||||
// todo remove it
|
||||
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes,
|
||||
&pExprs[i].base.interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
|
||||
tfree(pExprs);
|
||||
|
|
|
@ -654,53 +654,91 @@ if $data31 != @20-03-27 05:10:19.000@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
#sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2;
|
||||
#if $rows != 40 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data01 != 1.000000000 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data02 != t1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data03 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data04 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data11 != 1.000000000 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data12 != t1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data13 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data14 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2 limit 1;
|
||||
#if $rows != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data11 != 1.000000000 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data12 != t2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data13 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data14 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
print ===============>
|
||||
sql select stddev(c),c from st where t2=1 or t2=2 group by c;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data20 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data30 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data31 != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2;
|
||||
if $rows != 40 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1.000000000 then
|
||||
return -1
|
||||
endi
|
||||
if $data02 != t1 then
|
||||
return -1
|
||||
endi
|
||||
if $data03 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data04 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1.000000000 then
|
||||
return -1
|
||||
endi
|
||||
if $data12 != t1 then
|
||||
return -1
|
||||
endi
|
||||
if $data13 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data14 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2 limit 1;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1.000000000 then
|
||||
return -1
|
||||
endi
|
||||
if $data12 != t2 then
|
||||
return -1
|
||||
endi
|
||||
if $data13 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data14 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 1
|
||||
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
|
|
|
@ -63,4 +63,3 @@ run general/parser/between_and.sim
|
|||
run general/parser/last_cache.sim
|
||||
run general/parser/nestquery.sim
|
||||
run general/parser/precision_ns.sim
|
||||
|
||||
|
|
Loading…
Reference in New Issue