commit
3d7270edf5
|
@ -34,7 +34,7 @@
|
||||||
# 1.0: all CPU cores are available for query processing [default].
|
# 1.0: all CPU cores are available for query processing [default].
|
||||||
# 0.5: only half of the CPU cores are available for query.
|
# 0.5: only half of the CPU cores are available for query.
|
||||||
# 0.0: only one core available.
|
# 0.0: only one core available.
|
||||||
# tsRatioOfQueryCores 1.0
|
# ratioOfQueryCores 1.0
|
||||||
|
|
||||||
# the last_row/first/last aggregator will not change the original column name in the result fields
|
# the last_row/first/last aggregator will not change the original column name in the result fields
|
||||||
# keepColumnName 0
|
# keepColumnName 0
|
||||||
|
|
|
@ -281,7 +281,7 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pSql->res.code != TSDB_CODE_SUCCESS);
|
assert(pSql->res.code != TSDB_CODE_SUCCESS);
|
||||||
tscError("%p invoke user specified function due to error occured, code:%s", pSql, tstrerror(pSql->res.code));
|
tscError("%p invoke user specified function due to error occurred, code:%s", pSql, tstrerror(pSql->res.code));
|
||||||
|
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
if (pSql->fp == NULL || pSql->fetchFp == NULL){
|
if (pSql->fp == NULL || pSql->fetchFp == NULL){
|
||||||
|
@ -333,7 +333,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
|
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
taosReleaseRef(tscObjRef, pSql->self);
|
taosReleaseRef(tscObjRef, pSql->self);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,6 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
|
||||||
pCtx->outputBytes = pExpr->resBytes;
|
pCtx->outputBytes = pExpr->resBytes;
|
||||||
pCtx->outputType = pExpr->resType;
|
pCtx->outputType = pExpr->resType;
|
||||||
|
|
||||||
pCtx->startOffset = 0;
|
|
||||||
pCtx->size = 1;
|
pCtx->size = 1;
|
||||||
pCtx->hasNull = true;
|
pCtx->hasNull = true;
|
||||||
pCtx->currentStage = MERGE_STAGE;
|
pCtx->currentStage = MERGE_STAGE;
|
||||||
|
|
|
@ -2981,7 +2981,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
SSchema* pSchema = NULL;
|
SSchema* pSchema = NULL;
|
||||||
// SSchema s = tGetTbnameColumnSchema();
|
|
||||||
|
|
||||||
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
|
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
|
||||||
|
|
||||||
|
@ -4748,7 +4747,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
|
||||||
int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema) {
|
int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema) {
|
||||||
const char* msg0 = "only support order by primary timestamp";
|
const char* msg0 = "only support order by primary timestamp";
|
||||||
const char* msg1 = "invalid column name";
|
const char* msg1 = "invalid column name";
|
||||||
const char* msg2 = "only support order by primary timestamp or first tag in groupby clause allowed";
|
const char* msg2 = "order by primary timestamp or first tag in groupby clause allowed";
|
||||||
const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed";
|
const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed";
|
||||||
|
|
||||||
setDefaultOrderInfo(pQueryInfo);
|
setDefaultOrderInfo(pQueryInfo);
|
||||||
|
|
|
@ -451,7 +451,7 @@ int doProcessSql(SSqlObj *pSql) {
|
||||||
|
|
||||||
if (pRes->code != TSDB_CODE_SUCCESS) {
|
if (pRes->code != TSDB_CODE_SUCCESS) {
|
||||||
tscAsyncResultOnError(pSql);
|
tscAsyncResultOnError(pSql);
|
||||||
return pRes->code;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tscSendMsgToServer(pSql);
|
int32_t code = tscSendMsgToServer(pSql);
|
||||||
|
@ -460,7 +460,7 @@ int doProcessSql(SSqlObj *pSql) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pRes->code = code;
|
pRes->code = code;
|
||||||
tscAsyncResultOnError(pSql);
|
tscAsyncResultOnError(pSql);
|
||||||
return code;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -770,6 +770,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
char n[TSDB_TABLE_FNAME_LEN] = {0};
|
char n[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
tNameExtractFullName(&pTableMetaInfo->name, n);
|
tNameExtractFullName(&pTableMetaInfo->name, n);
|
||||||
|
|
||||||
|
|
||||||
tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
|
tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
|
||||||
pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
|
pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
|
||||||
pColSchema->name);
|
pColSchema->name);
|
||||||
|
@ -813,6 +814,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
|
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
|
||||||
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
|
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||||
|
|
||||||
|
// the queried table has been removed and a new table with the same name has already been created already
|
||||||
|
// return error msg
|
||||||
|
if (pExpr->uid != pTableMeta->id.uid) {
|
||||||
|
tscError("%p table has already been destroyed", pSql);
|
||||||
|
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
||||||
tscError("%p table schema is not matched with parsed sql", pSql);
|
tscError("%p table schema is not matched with parsed sql", pSql);
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
|
@ -856,6 +864,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
|
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
|
||||||
SSqlExpr *pExpr = pField->pSqlExpr;
|
SSqlExpr *pExpr = pField->pSqlExpr;
|
||||||
if (pExpr != NULL) {
|
if (pExpr != NULL) {
|
||||||
|
// the queried table has been removed and a new table with the same name has already been created already
|
||||||
|
// return error msg
|
||||||
|
if (pExpr->uid != pTableMeta->id.uid) {
|
||||||
|
tscError("%p table has already been destroyed", pSql);
|
||||||
|
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
||||||
tscError("%p table schema is not matched with parsed sql", pSql);
|
tscError("%p table schema is not matched with parsed sql", pSql);
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
|
|
|
@ -696,7 +696,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tscAsyncResultOnError(pSubObj);
|
tscAsyncResultOnError(pSubObj);
|
||||||
taosReleaseRef(tscObjRef, pSubObj->self);
|
// taosRelekaseRef(tscObjRef, pSubObj->self);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSql->subState.numOfSub <= 0) {
|
if (pSql->subState.numOfSub <= 0) {
|
||||||
|
|
|
@ -1888,14 +1888,31 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void destroySup(SFirstRoundQuerySup* pSup) {
|
||||||
|
taosArrayDestroyEx(pSup->pResult, freeInterResult);
|
||||||
|
taosArrayDestroy(pSup->pColsInfo);
|
||||||
|
tfree(pSup);
|
||||||
|
}
|
||||||
|
|
||||||
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
SSqlObj* pSql = (SSqlObj*)tres;
|
SSqlObj* pSql = (SSqlObj*)tres;
|
||||||
SSqlRes* pRes = &pSql->res;
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
SFirstRoundQuerySup* pSup = param;
|
SFirstRoundQuerySup* pSup = param;
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
|
||||||
|
|
||||||
if (numOfRows > 0) {
|
SSqlObj* pParent = pSup->pParent;
|
||||||
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||||
|
|
||||||
|
int32_t code = taos_errno(pSql);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
destroySup(pSup);
|
||||||
|
taos_free_result(pSql);
|
||||||
|
pParent->res.code = code;
|
||||||
|
tscAsyncResultOnError(pParent);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfRows > 0) { // the number is not correct for group by column in super table query
|
||||||
TAOS_ROW row = NULL;
|
TAOS_ROW row = NULL;
|
||||||
int32_t numOfCols = taos_field_count(tres);
|
int32_t numOfCols = taos_field_count(tres);
|
||||||
|
|
||||||
|
@ -1905,6 +1922,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
|
|
||||||
while ((row = taos_fetch_row(tres)) != NULL) {
|
while ((row = taos_fetch_row(tres)) != NULL) {
|
||||||
doAppendData(&interResult, row, numOfCols, pQueryInfo);
|
doAppendData(&interResult, row, numOfCols, pQueryInfo);
|
||||||
|
pSup->numOfRows += 1;
|
||||||
}
|
}
|
||||||
} else { // tagLen > 0
|
} else { // tagLen > 0
|
||||||
char* p = calloc(1, pSup->tagLen);
|
char* p = calloc(1, pSup->tagLen);
|
||||||
|
@ -1916,7 +1934,9 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
|
for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
|
||||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||||
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
|
|
||||||
|
// tag or group by column
|
||||||
|
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag) || pExpr->functionId == TSDB_FUNC_PRJ) {
|
||||||
memcpy(p + offset, row[i], length[i]);
|
memcpy(p + offset, row[i], length[i]);
|
||||||
offset += pExpr->resBytes;
|
offset += pExpr->resBytes;
|
||||||
}
|
}
|
||||||
|
@ -1945,20 +1965,20 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
taosArrayPush(pSup->pResult, &interResult);
|
taosArrayPush(pSup->pResult, &interResult);
|
||||||
doAppendData(&interResult, row, numOfCols, pQueryInfo);
|
doAppendData(&interResult, row, numOfCols, pQueryInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSup->numOfRows += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(p);
|
tfree(p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pSup->numOfRows += numOfRows;
|
|
||||||
if (!pRes->completed) {
|
if (!pRes->completed) {
|
||||||
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
|
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the parameters for the second round query process
|
// set the parameters for the second round query process
|
||||||
SSqlObj *pParent = pSup->pParent;
|
|
||||||
SSqlCmd *pPCmd = &pParent->cmd;
|
SSqlCmd *pPCmd = &pParent->cmd;
|
||||||
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);
|
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);
|
||||||
|
|
||||||
|
@ -1984,9 +2004,19 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
|
void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
|
||||||
int32_t c = taos_errno(tres);
|
SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*) param;
|
||||||
|
|
||||||
|
SSqlObj* pSql = (SSqlObj*) tres;
|
||||||
|
int32_t c = taos_errno(pSql);
|
||||||
|
|
||||||
if (c != TSDB_CODE_SUCCESS) {
|
if (c != TSDB_CODE_SUCCESS) {
|
||||||
// TODO HANDLE ERROR
|
SSqlObj* parent = pSup->pParent;
|
||||||
|
|
||||||
|
destroySup(pSup);
|
||||||
|
taos_free_result(pSql);
|
||||||
|
parent->res.code = code;
|
||||||
|
tscAsyncResultOnError(parent);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
|
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
|
||||||
|
@ -2020,13 +2050,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
||||||
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
|
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
|
||||||
if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
|
if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
// goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
|
if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
// goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNewQueryInfo->interval = pQueryInfo->interval;
|
pNewQueryInfo->interval = pQueryInfo->interval;
|
||||||
|
@ -2037,7 +2067,6 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
||||||
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
|
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
|
||||||
|
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
int32_t numOfTags = 0;
|
|
||||||
for(int32_t i = 0; i < numOfExprs; ++i) {
|
for(int32_t i = 0; i < numOfExprs; ++i) {
|
||||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||||
if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
|
if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
|
||||||
|
@ -2070,7 +2099,25 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
||||||
|
|
||||||
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
|
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
|
||||||
p->resColId = pExpr->resColId;
|
p->resColId = pExpr->resColId;
|
||||||
numOfTags += 1;
|
} else if (pExpr->functionId == TSDB_FUNC_PRJ) {
|
||||||
|
int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
|
||||||
|
for(int32_t k = 0; k < num; ++k) {
|
||||||
|
SColIndex* pIndex = taosArrayGet(pNewQueryInfo->groupbyExpr.columnInfo, k);
|
||||||
|
if (pExpr->colInfo.colId == pIndex->colId) {
|
||||||
|
pSup->tagLen += pExpr->resBytes;
|
||||||
|
taosArrayPush(pSup->pColsInfo, &pExpr->resColId);
|
||||||
|
|
||||||
|
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pIndex->colIndex};
|
||||||
|
SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
|
||||||
|
|
||||||
|
//doLimitOutputNormalColOfGroupby
|
||||||
|
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL);
|
||||||
|
p->numOfParams = 1;
|
||||||
|
p->param[0].i64 = 1;
|
||||||
|
p->param[0].nType = TSDB_DATA_TYPE_INT;
|
||||||
|
p->resColId = pExpr->resColId; // update the result column id
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2087,6 +2134,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
||||||
|
|
||||||
tscHandleMasterSTableQuery(pNew);
|
tscHandleMasterSTableQuery(pNew);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
destroySup(pSup);
|
||||||
|
taos_free_result(pNew);
|
||||||
|
pSql->res.code = terrno;
|
||||||
|
tscAsyncResultOnError(pSql);
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
||||||
|
@ -2128,7 +2182,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
||||||
tfree(pMemoryBuf);
|
tfree(pMemoryBuf);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
|
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
|
||||||
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
|
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
|
||||||
if (pSql->pSubs == NULL) {
|
if (pSql->pSubs == NULL) {
|
||||||
|
@ -2308,7 +2362,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
return ret;
|
return ret;
|
||||||
} else {
|
} else {
|
||||||
pSql->pSubs[trsupport->subqueryIndex] = pSql;
|
pParentSql->pSubs[trsupport->subqueryIndex] = pSql;
|
||||||
tscFreeRetrieveSup(pNew);
|
tscFreeRetrieveSup(pNew);
|
||||||
taos_free_result(pNew);
|
taos_free_result(pNew);
|
||||||
return ret;
|
return ret;
|
||||||
|
|
|
@ -57,7 +57,7 @@ void stmtInsertTest() {
|
||||||
v.ts = start_ts + 20;
|
v.ts = start_ts + 20;
|
||||||
v.k = 123;
|
v.k = 123;
|
||||||
|
|
||||||
char* str = "abc";
|
char str[] = "abc";
|
||||||
uintptr_t len = strlen(str);
|
uintptr_t len = strlen(str);
|
||||||
|
|
||||||
v.a = str;
|
v.a = str;
|
||||||
|
@ -65,7 +65,7 @@ void stmtInsertTest() {
|
||||||
params[2].buffer_length = len;
|
params[2].buffer_length = len;
|
||||||
params[2].buffer = str;
|
params[2].buffer = str;
|
||||||
|
|
||||||
char* nstr = "999";
|
char nstr[] = "999";
|
||||||
uintptr_t len1 = strlen(nstr);
|
uintptr_t len1 = strlen(nstr);
|
||||||
|
|
||||||
v.b = nstr;
|
v.b = nstr;
|
||||||
|
@ -84,18 +84,18 @@ void stmtInsertTest() {
|
||||||
v.ts = start_ts + 30;
|
v.ts = start_ts + 30;
|
||||||
v.k = 911;
|
v.k = 911;
|
||||||
|
|
||||||
str = "92";
|
char str1[] = "92";
|
||||||
len = strlen(str);
|
len = strlen(str1);
|
||||||
|
|
||||||
params[2].length = &len;
|
params[2].length = &len;
|
||||||
params[2].buffer_length = len;
|
params[2].buffer_length = len;
|
||||||
params[2].buffer = str;
|
params[2].buffer = str1;
|
||||||
|
|
||||||
nstr = "1920";
|
char nstr1[] = "1920";
|
||||||
len1 = strlen(nstr);
|
len1 = strlen(nstr1);
|
||||||
|
|
||||||
params[3].buffer_length = len1;
|
params[3].buffer_length = len1;
|
||||||
params[3].buffer = nstr;
|
params[3].buffer = nstr1;
|
||||||
params[3].length = &len1;
|
params[3].length = &len1;
|
||||||
|
|
||||||
taos_stmt_bind_param(stmt, params);
|
taos_stmt_bind_param(stmt, params);
|
||||||
|
@ -103,7 +103,7 @@ void stmtInsertTest() {
|
||||||
|
|
||||||
ret = taos_stmt_execute(stmt);
|
ret = taos_stmt_execute(stmt);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
printf("%p\n", ret);
|
printf("%d\n", ret);
|
||||||
printf("\033[31mfailed to execute insert statement.\033[0m\n");
|
printf("\033[31mfailed to execute insert statement.\033[0m\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,7 @@ static void freeSCqContext(void *handle) {
|
||||||
}
|
}
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
pthread_mutex_destroy(&pContext->mutex);
|
pthread_mutex_destroy(&pContext->mutex);
|
||||||
|
|
||||||
taosTmrCleanUp(pContext->tmrCtrl);
|
taosTmrCleanUp(pContext->tmrCtrl);
|
||||||
pContext->tmrCtrl = NULL;
|
pContext->tmrCtrl = NULL;
|
||||||
cDebug("vgId:%d, CQ is closed", pContext->vgId);
|
cDebug("vgId:%d, CQ is closed", pContext->vgId);
|
||||||
|
@ -203,7 +203,7 @@ void cqClose(void *handle) {
|
||||||
pContext->delete = 1;
|
pContext->delete = 1;
|
||||||
int32_t hasCq = 0;
|
int32_t hasCq = 0;
|
||||||
int32_t existLoop = 0;
|
int32_t existLoop = 0;
|
||||||
|
|
||||||
// stop all CQs
|
// stop all CQs
|
||||||
cqStop(pContext);
|
cqStop(pContext);
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ void cqClose(void *handle) {
|
||||||
if (pContext->pHead == NULL) {
|
if (pContext->pHead == NULL) {
|
||||||
existLoop = 1;
|
existLoop = 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -267,6 +267,7 @@ void cqStop(void *handle) {
|
||||||
if (tsEnableStream == 0) {
|
if (tsEnableStream == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
cDebug("vgId:%d, stop all CQs", pContext->vgId);
|
cDebug("vgId:%d, stop all CQs", pContext->vgId);
|
||||||
if (pContext->dbConn == NULL || pContext->master == 0) return;
|
if (pContext->dbConn == NULL || pContext->master == 0) return;
|
||||||
|
|
|
@ -84,7 +84,7 @@ extern "C" {
|
||||||
#define TSDB_FUNCSTATE_SO 0x1u // single output
|
#define TSDB_FUNCSTATE_SO 0x1u // single output
|
||||||
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
|
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
|
||||||
#define TSDB_FUNCSTATE_STREAM 0x4u // function avail for stream
|
#define TSDB_FUNCSTATE_STREAM 0x4u // function avail for stream
|
||||||
#define TSDB_FUNCSTATE_STABLE 0x8u // function avail for metric
|
#define TSDB_FUNCSTATE_STABLE 0x8u // function avail for super table
|
||||||
#define TSDB_FUNCSTATE_OF 0x10u // outer forward
|
#define TSDB_FUNCSTATE_OF 0x10u // outer forward
|
||||||
#define TSDB_FUNCSTATE_NEED_TS 0x20u // timestamp is required during query processing
|
#define TSDB_FUNCSTATE_NEED_TS 0x20u // timestamp is required during query processing
|
||||||
#define TSDB_FUNCSTATE_SELECTIVITY 0x40u // selectivity functions, can exists along with tag columns
|
#define TSDB_FUNCSTATE_SELECTIVITY 0x40u // selectivity functions, can exists along with tag columns
|
||||||
|
@ -166,9 +166,8 @@ typedef struct SExtTagsInfo {
|
||||||
|
|
||||||
// sql function runtime context
|
// sql function runtime context
|
||||||
typedef struct SQLFunctionCtx {
|
typedef struct SQLFunctionCtx {
|
||||||
int32_t startOffset; // todo remove it
|
|
||||||
int32_t size; // number of rows
|
int32_t size; // number of rows
|
||||||
void * pInput; //
|
void * pInput; // input data buffer
|
||||||
uint32_t order; // asc|desc
|
uint32_t order; // asc|desc
|
||||||
int16_t inputType;
|
int16_t inputType;
|
||||||
int16_t inputBytes;
|
int16_t inputBytes;
|
||||||
|
@ -184,7 +183,7 @@ typedef struct SQLFunctionCtx {
|
||||||
uint8_t currentStage; // record current running step, default: 0
|
uint8_t currentStage; // record current running step, default: 0
|
||||||
int64_t startTs; // timestamp range of current query when function is executed on a specific data block
|
int64_t startTs; // timestamp range of current query when function is executed on a specific data block
|
||||||
int32_t numOfParams;
|
int32_t numOfParams;
|
||||||
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
|
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
|
||||||
int64_t *ptsList; // corresponding timestamp array list
|
int64_t *ptsList; // corresponding timestamp array list
|
||||||
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
|
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
|
||||||
SQLPreAggVal preAggVals;
|
SQLPreAggVal preAggVals;
|
||||||
|
@ -228,7 +227,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
||||||
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
|
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
|
||||||
#define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0)
|
#define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0)
|
||||||
|
|
||||||
/* determine the real data need to calculated the result */
|
// determine the real data need to calculated the result
|
||||||
enum {
|
enum {
|
||||||
BLK_DATA_NO_NEEDED = 0x0,
|
BLK_DATA_NO_NEEDED = 0x0,
|
||||||
BLK_DATA_STATIS_NEEDED = 0x1,
|
BLK_DATA_STATIS_NEEDED = 0x1,
|
||||||
|
|
|
@ -33,6 +33,36 @@ struct SColumnFilterElem;
|
||||||
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
|
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
|
||||||
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
||||||
|
|
||||||
|
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
||||||
|
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
|
||||||
|
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
|
||||||
|
|
||||||
|
#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
|
||||||
|
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
|
||||||
|
|
||||||
|
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
|
||||||
|
|
||||||
|
enum {
|
||||||
|
// when query starts to execute, this status will set
|
||||||
|
QUERY_NOT_COMPLETED = 0x1u,
|
||||||
|
|
||||||
|
/* result output buffer is full, current query is paused.
|
||||||
|
* this status is only exist in group-by clause and diff/add/division/multiply/ query.
|
||||||
|
*/
|
||||||
|
QUERY_RESBUF_FULL = 0x2u,
|
||||||
|
|
||||||
|
/* query is over
|
||||||
|
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
|
||||||
|
* 2. when all data within queried time window, it is also denoted as query_completed
|
||||||
|
*/
|
||||||
|
QUERY_COMPLETED = 0x4u,
|
||||||
|
|
||||||
|
/* when the result is not completed return to client, this status will be
|
||||||
|
* usually used in case of interval query with interpolation option
|
||||||
|
*/
|
||||||
|
QUERY_OVER = 0x8u,
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct SResultRowPool {
|
typedef struct SResultRowPool {
|
||||||
int32_t elemSize;
|
int32_t elemSize;
|
||||||
int32_t blockSize;
|
int32_t blockSize;
|
||||||
|
@ -66,7 +96,8 @@ typedef struct SResultRow {
|
||||||
} SResultRow;
|
} SResultRow;
|
||||||
|
|
||||||
typedef struct SGroupResInfo {
|
typedef struct SGroupResInfo {
|
||||||
int32_t rowId;
|
int32_t totalGroup;
|
||||||
|
int32_t currentGroup;
|
||||||
int32_t index;
|
int32_t index;
|
||||||
SArray* pRows; // SArray<SResultRow*>
|
SArray* pRows; // SArray<SResultRow*>
|
||||||
} SGroupResInfo;
|
} SGroupResInfo;
|
||||||
|
@ -112,7 +143,7 @@ typedef struct STableQueryInfo {
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
STSCursor cur;
|
STSCursor cur;
|
||||||
void* pTable; // for retrieve the page id list
|
void* pTable; // for retrieve the page id list
|
||||||
SResultRowInfo windowResInfo;
|
SResultRowInfo resInfo;
|
||||||
} STableQueryInfo;
|
} STableQueryInfo;
|
||||||
|
|
||||||
typedef struct SQueryCostInfo {
|
typedef struct SQueryCostInfo {
|
||||||
|
@ -193,7 +224,7 @@ typedef struct SQueryRuntimeEnv {
|
||||||
uint16_t* offset;
|
uint16_t* offset;
|
||||||
uint16_t scanFlag; // denotes reversed scan of data or not
|
uint16_t scanFlag; // denotes reversed scan of data or not
|
||||||
SFillInfo* pFillInfo;
|
SFillInfo* pFillInfo;
|
||||||
SResultRowInfo windowResInfo;
|
SResultRowInfo resultRowInfo;
|
||||||
|
|
||||||
SQueryCostInfo summary;
|
SQueryCostInfo summary;
|
||||||
void* pQueryHandle;
|
void* pQueryHandle;
|
||||||
|
@ -204,7 +235,8 @@ typedef struct SQueryRuntimeEnv {
|
||||||
bool hasTagResults; // if there are tag values in final result or not
|
bool hasTagResults; // if there are tag values in final result or not
|
||||||
bool timeWindowInterpo;// if the time window start/end required interpolation
|
bool timeWindowInterpo;// if the time window start/end required interpolation
|
||||||
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
|
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
|
||||||
bool queryBlockDist; // if query data block distribution
|
bool queryBlockDist; // if query data block distribution
|
||||||
|
bool stabledev; // super table stddev query
|
||||||
int32_t interBufSize; // intermediate buffer sizse
|
int32_t interBufSize; // intermediate buffer sizse
|
||||||
int32_t prevGroupId; // previous executed group id
|
int32_t prevGroupId; // previous executed group id
|
||||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
|
@ -257,4 +289,51 @@ typedef struct SQInfo {
|
||||||
char* sql; // query sql string
|
char* sql; // query sql string
|
||||||
} SQInfo;
|
} SQInfo;
|
||||||
|
|
||||||
|
typedef struct SQueryParam {
|
||||||
|
char *sql;
|
||||||
|
char *tagCond;
|
||||||
|
char *tbnameCond;
|
||||||
|
char *prevResult;
|
||||||
|
SArray *pTableIdList;
|
||||||
|
SSqlFuncMsg **pExprMsg;
|
||||||
|
SSqlFuncMsg **pSecExprMsg;
|
||||||
|
SExprInfo *pExprs;
|
||||||
|
SExprInfo *pSecExprs;
|
||||||
|
|
||||||
|
SColIndex *pGroupColIndex;
|
||||||
|
SColumnInfo *pTagColumnInfo;
|
||||||
|
SSqlGroupbyExpr *pGroupbyExpr;
|
||||||
|
} SQueryParam;
|
||||||
|
|
||||||
|
void freeParam(SQueryParam *param);
|
||||||
|
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
|
||||||
|
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
|
||||||
|
SColumnInfo* pTagCols);
|
||||||
|
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
|
||||||
|
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
|
||||||
|
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
|
||||||
|
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
|
||||||
|
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
|
||||||
|
|
||||||
|
bool isQueryKilled(SQInfo *pQInfo);
|
||||||
|
int32_t checkForQueryBuf(size_t numOfTables);
|
||||||
|
bool doBuildResCheck(SQInfo* pQInfo);
|
||||||
|
void setQueryStatus(SQuery *pQuery, int8_t status);
|
||||||
|
|
||||||
|
bool onlyQueryTags(SQuery* pQuery);
|
||||||
|
void buildTagQueryResult(SQInfo *pQInfo);
|
||||||
|
void stableQueryImpl(SQInfo *pQInfo);
|
||||||
|
void buildTableBlockDistResult(SQInfo *pQInfo);
|
||||||
|
void tableQueryImpl(SQInfo *pQInfo);
|
||||||
|
bool isValidQInfo(void *param);
|
||||||
|
|
||||||
|
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
|
||||||
|
|
||||||
|
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
|
||||||
|
void setQueryKilled(SQInfo *pQInfo);
|
||||||
|
void queryCostStatis(SQInfo *pQInfo);
|
||||||
|
void freeQInfo(SQInfo *pQInfo);
|
||||||
|
|
||||||
|
int32_t getMaximumIdleDurationSec();
|
||||||
|
|
||||||
#endif // TDENGINE_QUERYEXECUTOR_H
|
#endif // TDENGINE_QUERYEXECUTOR_H
|
||||||
|
|
|
@ -85,4 +85,12 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
|
||||||
SArray* interResFromBinary(const char* data, int32_t len);
|
SArray* interResFromBinary(const char* data, int32_t len);
|
||||||
void freeInterResult(void* param);
|
void freeInterResult(void* param);
|
||||||
|
|
||||||
|
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset);
|
||||||
|
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||||
|
bool hasRemainData(SGroupResInfo* pGroupResInfo);
|
||||||
|
bool incNextGroup(SGroupResInfo* pGroupResInfo);
|
||||||
|
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
|
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo);
|
||||||
|
|
||||||
#endif // TDENGINE_QUERYUTIL_H
|
#endif // TDENGINE_QUERYUTIL_H
|
||||||
|
|
|
@ -26,10 +26,12 @@
|
||||||
#include "qTsbuf.h"
|
#include "qTsbuf.h"
|
||||||
#include "queryLog.h"
|
#include "queryLog.h"
|
||||||
|
|
||||||
#define GET_INPUT_DATA_LIST(x) (((char *)((x)->pInput)) + ((x)->startOffset) * ((x)->inputBytes))
|
//#define GET_INPUT_DATA_LIST(x) (((char *)((x)->pInput)) + ((x)->startOffset) * ((x)->inputBytes))
|
||||||
|
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
|
||||||
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
|
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
|
||||||
|
|
||||||
#define GET_TS_LIST(x) ((TSKEY*)&((x)->ptsList[(x)->startOffset]))
|
//#define GET_TS_LIST(x) ((TSKEY*)&((x)->ptsList[(x)->startOffset]))
|
||||||
|
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
||||||
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
||||||
|
|
||||||
#define GET_TRUE_DATA_TYPE() \
|
#define GET_TRUE_DATA_TYPE() \
|
||||||
|
@ -379,11 +381,7 @@ static bool function_setup(SQLFunctionCtx *pCtx) {
|
||||||
static void function_finalizer(SQLFunctionCtx *pCtx) {
|
static void function_finalizer(SQLFunctionCtx *pCtx) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
||||||
setVardataNull(pCtx->pOutput, pCtx->outputType);
|
|
||||||
} else {
|
|
||||||
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
doFinalizer(pCtx);
|
doFinalizer(pCtx);
|
||||||
|
@ -414,10 +412,7 @@ static void count_function(SQLFunctionCtx *pCtx) {
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/*
|
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
|
||||||
* when counting on the primary time stamp column and no statistics data is provided,
|
|
||||||
* simple use the size value
|
|
||||||
*/
|
|
||||||
numOfElem = pCtx->size;
|
numOfElem = pCtx->size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -944,9 +939,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
|
||||||
*
|
*
|
||||||
* The following codes of 3 lines will be removed later.
|
* The following codes of 3 lines will be removed later.
|
||||||
*/
|
*/
|
||||||
if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
|
// if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
|
||||||
index = 0;
|
// index = 0;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// the index is the original position, not the relative position
|
// the index is the original position, not the relative position
|
||||||
key = pCtx->ptsList[index];
|
key = pCtx->ptsList[index];
|
||||||
|
@ -1637,6 +1632,97 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
|
||||||
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo));
|
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void stddev_dst_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
|
void *pData = GET_INPUT_DATA(pCtx, index);
|
||||||
|
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// the second stage to calculate standard deviation
|
||||||
|
SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
double *retVal = &pStd->res;
|
||||||
|
|
||||||
|
// all data are null, no need to proceed
|
||||||
|
SArray* resList = (SArray*) pCtx->param[0].pz;
|
||||||
|
if (resList == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// find the correct group average results according to the tag value
|
||||||
|
int32_t len = (int32_t) taosArrayGetSize(resList);
|
||||||
|
assert(len > 0);
|
||||||
|
|
||||||
|
double avg = 0;
|
||||||
|
if (len == 1) {
|
||||||
|
SResPair* p = taosArrayGet(resList, 0);
|
||||||
|
avg = p->avg;
|
||||||
|
} else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result
|
||||||
|
SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare);
|
||||||
|
assert(p != NULL);
|
||||||
|
|
||||||
|
avg = p->avg;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t num = 0;
|
||||||
|
switch (pCtx->inputType) {
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
num += 1;
|
||||||
|
*retVal += POW2(((int32_t *)pData)[i] - avg);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
LOOP_STDDEV_IMPL(float, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
LOOP_STDDEV_IMPL(double, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT: {
|
||||||
|
LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
LOOP_STDDEV_IMPL(int16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
|
LOOP_STDDEV_IMPL(uint16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
|
LOOP_STDDEV_IMPL(uint32_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
LOOP_STDDEV_IMPL(int64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT: {
|
||||||
|
LOOP_STDDEV_IMPL(uint64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
qError("stddev function not support data type:%d", pCtx->inputType);
|
||||||
|
}
|
||||||
|
|
||||||
|
pStd->num += num;
|
||||||
|
SET_VAL(pCtx, num, 1);
|
||||||
|
|
||||||
|
// copy to the final output buffer for super table
|
||||||
|
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void stddev_dst_merge(SQLFunctionCtx *pCtx) {
|
static void stddev_dst_merge(SQLFunctionCtx *pCtx) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
@ -3489,9 +3575,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
|
||||||
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
|
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
|
||||||
|
|
||||||
arithmeticTreeTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->pOutput, sas, pCtx->order, getArithColumnData);
|
arithmeticTreeTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->pOutput, sas, pCtx->order, getArithColumnData);
|
||||||
|
|
||||||
pCtx->pOutput += pCtx->outputBytes * pCtx->size;
|
pCtx->pOutput += pCtx->outputBytes * pCtx->size;
|
||||||
pCtx->param[1].pz = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
|
@ -3979,6 +4063,12 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
|
||||||
} else {
|
} else {
|
||||||
assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType);
|
assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType);
|
||||||
}
|
}
|
||||||
|
} else if (type == TSDB_FILL_NEXT) {
|
||||||
|
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
|
||||||
|
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->end.val);
|
||||||
|
} else {
|
||||||
|
assignVal(pCtx->pOutput, pCtx->end.ptr, pCtx->outputBytes, pCtx->inputType);
|
||||||
|
}
|
||||||
} else if (type == TSDB_FILL_LINEAR) {
|
} else if (type == TSDB_FILL_LINEAR) {
|
||||||
SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val};
|
SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val};
|
||||||
SPoint point2 = {.key = pCtx->end.key, .val = &pCtx->end.val};
|
SPoint point2 = {.key = pCtx->end.key, .val = &pCtx->end.val};
|
||||||
|
@ -4838,7 +4928,7 @@ SAggFunctionInfo aAggs[] = {{
|
||||||
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE,
|
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE,
|
||||||
function_setup,
|
function_setup,
|
||||||
stddev_dst_function,
|
stddev_dst_function,
|
||||||
noop2,
|
stddev_dst_function_f,
|
||||||
no_next_step,
|
no_next_step,
|
||||||
stddev_dst_finalizer,
|
stddev_dst_finalizer,
|
||||||
stddev_dst_merge,
|
stddev_dst_merge,
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -20,6 +20,14 @@
|
||||||
#include "qExecutor.h"
|
#include "qExecutor.h"
|
||||||
#include "qUtil.h"
|
#include "qUtil.h"
|
||||||
#include "tbuffer.h"
|
#include "tbuffer.h"
|
||||||
|
#include "tlosertree.h"
|
||||||
|
#include "queryLog.h"
|
||||||
|
|
||||||
|
typedef struct SCompSupporter {
|
||||||
|
STableQueryInfo **pTableQueryInfo;
|
||||||
|
int32_t *rowIndex;
|
||||||
|
int32_t order;
|
||||||
|
} SCompSupporter;
|
||||||
|
|
||||||
int32_t getOutputInterResultBufSize(SQuery* pQuery) {
|
int32_t getOutputInterResultBufSize(SQuery* pQuery) {
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
|
@ -322,4 +330,243 @@ void freeInterResult(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pResult->pResult);
|
taosArrayDestroy(pResult->pResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
||||||
|
assert(pGroupResInfo != NULL);
|
||||||
|
|
||||||
|
taosArrayDestroy(pGroupResInfo->pRows);
|
||||||
|
pGroupResInfo->pRows = NULL;
|
||||||
|
pGroupResInfo->index = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset) {
|
||||||
|
if (pGroupResInfo->pRows != NULL) {
|
||||||
|
taosArrayDestroy(pGroupResInfo->pRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES);
|
||||||
|
pGroupResInfo->index = offset;
|
||||||
|
|
||||||
|
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasRemainData(SGroupResInfo* pGroupResInfo) {
|
||||||
|
if (pGroupResInfo->pRows == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool incNextGroup(SGroupResInfo* pGroupResInfo) {
|
||||||
|
return (++pGroupResInfo->currentGroup) < pGroupResInfo->totalGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
|
||||||
|
assert(pGroupResInfo != NULL);
|
||||||
|
if (pGroupResInfo->pRows == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) {
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
|
int32_t functionId = pQuery->pExpr1[j].base.functionId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ts, tag, tagprj function can not decide the output number of current query
|
||||||
|
* the number of output result is decided by main output
|
||||||
|
*/
|
||||||
|
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SResultRowCellInfo *pResultInfo = getResultCell(pRuntimeEnv, pResultRow, j);
|
||||||
|
assert(pResultInfo != NULL);
|
||||||
|
|
||||||
|
if (pResultInfo->numOfRes > 0) {
|
||||||
|
return pResultInfo->numOfRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
|
||||||
|
int32_t left = *(int32_t *)pLeft;
|
||||||
|
int32_t right = *(int32_t *)pRight;
|
||||||
|
|
||||||
|
SCompSupporter * supporter = (SCompSupporter *)param;
|
||||||
|
|
||||||
|
int32_t leftPos = supporter->rowIndex[left];
|
||||||
|
int32_t rightPos = supporter->rowIndex[right];
|
||||||
|
|
||||||
|
/* left source is exhausted */
|
||||||
|
if (leftPos == -1) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* right source is exhausted*/
|
||||||
|
if (rightPos == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableQueryInfo** pList = supporter->pTableQueryInfo;
|
||||||
|
|
||||||
|
SResultRowInfo *pWindowResInfo1 = &(pList[left]->resInfo);
|
||||||
|
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
|
||||||
|
TSKEY leftTimestamp = pWindowRes1->win.skey;
|
||||||
|
|
||||||
|
SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo);
|
||||||
|
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
|
||||||
|
TSKEY rightTimestamp = pWindowRes2->win.skey;
|
||||||
|
|
||||||
|
if (leftTimestamp == rightTimestamp) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (supporter->order == TSDB_ORDER_ASC) {
|
||||||
|
return (leftTimestamp > rightTimestamp)? 1:-1;
|
||||||
|
} else {
|
||||||
|
return (leftTimestamp < rightTimestamp)? 1:-1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, void* qinfo) {
|
||||||
|
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery);
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
int32_t *posList = NULL;
|
||||||
|
SLoserTreeInfo *pTree = NULL;
|
||||||
|
STableQueryInfo **pTableQueryInfoList = NULL;
|
||||||
|
|
||||||
|
size_t size = taosArrayGetSize(pTableList);
|
||||||
|
if (pGroupResInfo->pRows == NULL) {
|
||||||
|
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
posList = calloc(size, sizeof(int32_t));
|
||||||
|
pTableQueryInfoList = malloc(POINTER_BYTES * size);
|
||||||
|
|
||||||
|
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
|
||||||
|
qError("QInfo:%p failed alloc memory", qinfo);
|
||||||
|
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfTables = 0;
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
STableQueryInfo *item = taosArrayGetP(pTableList, i);
|
||||||
|
if (item->resInfo.size > 0) {
|
||||||
|
pTableQueryInfoList[numOfTables++] = item;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// there is no data in current group
|
||||||
|
// no need to merge results since only one table in each group
|
||||||
|
if (numOfTables == 0) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQuery->order.order};
|
||||||
|
|
||||||
|
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX;
|
||||||
|
int64_t startt = taosGetTimestampMs();
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int32_t tableIndex = pTree->pNode[0].index;
|
||||||
|
|
||||||
|
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
|
||||||
|
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
|
||||||
|
|
||||||
|
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
|
||||||
|
if (num <= 0) {
|
||||||
|
cs.rowIndex[tableIndex] += 1;
|
||||||
|
|
||||||
|
if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) {
|
||||||
|
cs.rowIndex[tableIndex] = -1;
|
||||||
|
if (--numOfTables == 0) { // all input sources are exhausted
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery));
|
||||||
|
|
||||||
|
if (pWindowRes->win.skey != lastTimestamp) {
|
||||||
|
taosArrayPush(pGroupResInfo->pRows, &pWindowRes);
|
||||||
|
pWindowRes->numOfRows = (uint32_t) num;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastTimestamp = pWindowRes->win.skey;
|
||||||
|
|
||||||
|
// move to the next row of current entry
|
||||||
|
if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) {
|
||||||
|
cs.rowIndex[tableIndex] = -1;
|
||||||
|
|
||||||
|
// all input sources are exhausted
|
||||||
|
if ((--numOfTables) == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t endt = taosGetTimestampMs();
|
||||||
|
|
||||||
|
qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", qinfo,
|
||||||
|
pGroupResInfo->currentGroup, endt - startt);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
tfree(pTableQueryInfoList);
|
||||||
|
tfree(posList);
|
||||||
|
tfree(pTree);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
|
||||||
|
SArray *group = GET_TABLEGROUP(pQInfo, pGroupResInfo->currentGroup);
|
||||||
|
|
||||||
|
int32_t ret = mergeIntoGroupResultImpl(&pQInfo->runtimeEnv, pGroupResInfo, group, pQInfo);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this group generates at least one result, return results
|
||||||
|
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("QInfo:%p no result in group %d, continue", pQInfo, pGroupResInfo->currentGroup);
|
||||||
|
cleanupGroupResInfo(pGroupResInfo);
|
||||||
|
incNextGroup(pGroupResInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) {
|
||||||
|
SET_STABLE_QUERY_OVER(pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t elapsedTime = taosGetTimestampUs() - st;
|
||||||
|
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo,
|
||||||
|
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
|
||||||
|
|
||||||
|
pQInfo->runtimeEnv.summary.firstStageMergeTime += elapsedTime;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,542 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "qFill.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tcache.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
|
||||||
|
#include "exception.h"
|
||||||
|
#include "hash.h"
|
||||||
|
#include "texpr.h"
|
||||||
|
#include "qExecutor.h"
|
||||||
|
#include "qResultbuf.h"
|
||||||
|
#include "qUtil.h"
|
||||||
|
#include "query.h"
|
||||||
|
#include "queryLog.h"
|
||||||
|
#include "tlosertree.h"
|
||||||
|
#include "ttype.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
|
||||||
|
typedef struct SQueryMgmt {
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
SCacheObj *qinfoPool; // query handle pool
|
||||||
|
int32_t vgId;
|
||||||
|
bool closed;
|
||||||
|
} SQueryMgmt;
|
||||||
|
|
||||||
|
static void queryMgmtKillQueryFn(void* handle) {
|
||||||
|
void** fp = (void**)handle;
|
||||||
|
qKillQuery(*fp);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void freeqinfoFn(void *qhandle) {
|
||||||
|
void** handle = qhandle;
|
||||||
|
if (handle == NULL || *handle == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qKillQuery(*handle);
|
||||||
|
qDestroyQueryInfo(*handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
void freeParam(SQueryParam *param) {
|
||||||
|
tfree(param->sql);
|
||||||
|
tfree(param->tagCond);
|
||||||
|
tfree(param->tbnameCond);
|
||||||
|
tfree(param->pTableIdList);
|
||||||
|
tfree(param->pExprMsg);
|
||||||
|
tfree(param->pSecExprMsg);
|
||||||
|
tfree(param->pExprs);
|
||||||
|
tfree(param->pSecExprs);
|
||||||
|
tfree(param->pGroupColIndex);
|
||||||
|
tfree(param->pTagColumnInfo);
|
||||||
|
tfree(param->pGroupbyExpr);
|
||||||
|
tfree(param->prevResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo) {
|
||||||
|
assert(pQueryMsg != NULL && tsdb != NULL);
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SQueryParam param = {0};
|
||||||
|
code = convertQueryMsg(pQueryMsg, ¶m);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pQueryMsg->numOfTables <= 0) {
|
||||||
|
qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
|
||||||
|
code = TSDB_CODE_QRY_INVALID_MSG;
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) {
|
||||||
|
qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
|
||||||
|
code = TSDB_CODE_QRY_INVALID_MSG;
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (param.pSecExprMsg != NULL) {
|
||||||
|
if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
param.pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, param.pGroupColIndex, &code);
|
||||||
|
if ((param.pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isSTableQuery = false;
|
||||||
|
STableGroupInfo tableGroupInfo = {0};
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
|
||||||
|
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
|
||||||
|
|
||||||
|
qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
|
||||||
|
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, pQueryMsg->window.skey, &tableGroupInfo)) != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
|
||||||
|
isSTableQuery = true;
|
||||||
|
|
||||||
|
// also note there's possibility that only one table in the super table
|
||||||
|
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
|
||||||
|
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
|
||||||
|
|
||||||
|
// group by normal column, do not pass the group by condition to tsdb to group table into different group
|
||||||
|
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
|
||||||
|
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(param.pGroupColIndex->flag)) {
|
||||||
|
numOfGroupByCols = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("qmsg:%p query stable, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
|
||||||
|
code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, param.tagCond, pQueryMsg->tagCondLen,
|
||||||
|
pQueryMsg->tagNameRelType, param.tbnameCond, &tableGroupInfo, param.pGroupColIndex, numOfGroupByCols);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
code = tsdbGetTableGroupFromIdList(tsdb, param.pTableIdList, &tableGroupInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("qmsg:%p query on %" PRIzu " tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t el = taosGetTimestampUs() - st;
|
||||||
|
qDebug("qmsg:%p tag filter completed, numOfTables:%" PRIzu ", elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
|
||||||
|
} else {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = checkForQueryBuf(tableGroupInfo.numOfTables);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql);
|
||||||
|
|
||||||
|
param.sql = NULL;
|
||||||
|
param.pExprs = NULL;
|
||||||
|
param.pSecExprs = NULL;
|
||||||
|
param.pGroupbyExpr = NULL;
|
||||||
|
param.pTagColumnInfo = NULL;
|
||||||
|
|
||||||
|
if ((*pQInfo) == NULL) {
|
||||||
|
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, ¶m, isSTableQuery);
|
||||||
|
|
||||||
|
_over:
|
||||||
|
if (param.pGroupbyExpr != NULL) {
|
||||||
|
taosArrayDestroy(param.pGroupbyExpr->columnInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(param.pTableIdList);
|
||||||
|
param.pTableIdList = NULL;
|
||||||
|
|
||||||
|
freeParam(¶m);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) {
|
||||||
|
SColumnInfo* column = pQueryMsg->colList + i;
|
||||||
|
freeColumnFilterInfo(column->filters, column->numOfFilters);
|
||||||
|
}
|
||||||
|
|
||||||
|
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
*pQInfo = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if failed to add ref for all tables in this query, abort current query
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool qTableQuery(qinfo_t qinfo) {
|
||||||
|
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||||
|
assert(pQInfo && pQInfo->signature == pQInfo);
|
||||||
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
|
||||||
|
int64_t curOwner = 0;
|
||||||
|
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
|
||||||
|
qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner);
|
||||||
|
pQInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pQInfo->startExecTs = taosGetTimestampSec();
|
||||||
|
|
||||||
|
if (isQueryKilled(pQInfo)) {
|
||||||
|
qDebug("QInfo:%p it is already killed, abort", pQInfo);
|
||||||
|
return doBuildResCheck(pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
|
||||||
|
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
|
||||||
|
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
|
||||||
|
return doBuildResCheck(pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
// error occurs, record the error code and return to client
|
||||||
|
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
pQInfo->code = ret;
|
||||||
|
qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
|
||||||
|
return doBuildResCheck(pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("QInfo:%p query task is launched", pQInfo);
|
||||||
|
|
||||||
|
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
|
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
||||||
|
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
|
||||||
|
buildTagQueryResult(pQInfo);
|
||||||
|
} else if (pQInfo->runtimeEnv.stableQuery) {
|
||||||
|
stableQueryImpl(pQInfo);
|
||||||
|
} else if (pQInfo->runtimeEnv.queryBlockDist){
|
||||||
|
buildTableBlockDistResult(pQInfo);
|
||||||
|
} else {
|
||||||
|
tableQueryImpl(pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
if (isQueryKilled(pQInfo)) {
|
||||||
|
qDebug("QInfo:%p query is killed", pQInfo);
|
||||||
|
} else if (pQuery->rec.rows == 0) {
|
||||||
|
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
|
||||||
|
} else {
|
||||||
|
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
|
||||||
|
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
return doBuildResCheck(pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) {
|
||||||
|
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||||
|
|
||||||
|
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
||||||
|
qError("QInfo:%p invalid qhandle", pQInfo);
|
||||||
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
*buildRes = false;
|
||||||
|
if (IS_QUERY_KILLED(pQInfo)) {
|
||||||
|
qDebug("QInfo:%p query is killed, code:0x%08x", pQInfo, pQInfo->code);
|
||||||
|
return pQInfo->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (tsRetrieveBlockingModel) {
|
||||||
|
pQInfo->rspContext = pRspContext;
|
||||||
|
tsem_wait(&pQInfo->ready);
|
||||||
|
*buildRes = true;
|
||||||
|
code = pQInfo->code;
|
||||||
|
} else {
|
||||||
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pQInfo->lock);
|
||||||
|
|
||||||
|
assert(pQInfo->rspContext == NULL);
|
||||||
|
if (pQInfo->dataReady == QUERY_RESULT_READY) {
|
||||||
|
*buildRes = true;
|
||||||
|
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->resultRowSize,
|
||||||
|
pQuery->rec.rows, tstrerror(pQInfo->code));
|
||||||
|
} else {
|
||||||
|
*buildRes = false;
|
||||||
|
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
|
||||||
|
pQInfo->rspContext = pRspContext;
|
||||||
|
assert(pQInfo->rspContext != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = pQInfo->code;
|
||||||
|
pthread_mutex_unlock(&pQInfo->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) {
|
||||||
|
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||||
|
|
||||||
|
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
|
||||||
|
|
||||||
|
size += sizeof(int32_t);
|
||||||
|
size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo);
|
||||||
|
|
||||||
|
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
|
||||||
|
|
||||||
|
// current solution only avoid crash, but cannot return error code to client
|
||||||
|
*pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen);
|
||||||
|
if (*pRsp == NULL) {
|
||||||
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows);
|
||||||
|
|
||||||
|
if (pQInfo->code == TSDB_CODE_SUCCESS) {
|
||||||
|
(*pRsp)->offset = htobe64(pQuery->limit.offset);
|
||||||
|
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
|
||||||
|
} else {
|
||||||
|
(*pRsp)->offset = 0;
|
||||||
|
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pRsp)->precision = htons(pQuery->precision);
|
||||||
|
if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
|
||||||
|
doDumpQueryResult(pQInfo, (*pRsp)->data);
|
||||||
|
} else {
|
||||||
|
setQueryStatus(pQuery, QUERY_OVER);
|
||||||
|
}
|
||||||
|
|
||||||
|
pQInfo->rspContext = NULL;
|
||||||
|
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
|
||||||
|
|
||||||
|
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
|
||||||
|
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
|
||||||
|
*continueExec = false;
|
||||||
|
(*pRsp)->completed = 1; // notify no more result to client
|
||||||
|
} else {
|
||||||
|
*continueExec = true;
|
||||||
|
qDebug("QInfo:%p has more results to retrieve", pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
// the memory should be freed if the code of pQInfo is not TSDB_CODE_SUCCESS
|
||||||
|
if (pQInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
rpcFreeCont(*pRsp);
|
||||||
|
*pRsp = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pQInfo->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* qGetResultRetrieveMsg(qinfo_t qinfo) {
|
||||||
|
SQInfo* pQInfo = (SQInfo*) qinfo;
|
||||||
|
assert(pQInfo != NULL);
|
||||||
|
|
||||||
|
return pQInfo->rspContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qKillQuery(qinfo_t qinfo) {
|
||||||
|
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||||
|
|
||||||
|
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
setQueryKilled(pQInfo);
|
||||||
|
|
||||||
|
// Wait for the query executing thread being stopped/
|
||||||
|
// Once the query is stopped, the owner of qHandle will be cleared immediately.
|
||||||
|
while (pQInfo->owner != 0) {
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qQueryCompleted(qinfo_t qinfo) {
|
||||||
|
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||||
|
|
||||||
|
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
|
||||||
|
}
|
||||||
|
|
||||||
|
void qDestroyQueryInfo(qinfo_t qHandle) {
|
||||||
|
SQInfo* pQInfo = (SQInfo*) qHandle;
|
||||||
|
if (!isValidQInfo(pQInfo)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("QInfo:%p query completed", pQInfo);
|
||||||
|
queryCostStatis(pQInfo); // print the query cost summary
|
||||||
|
freeQInfo(pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* qOpenQueryMgmt(int32_t vgId) {
|
||||||
|
const int32_t refreshHandleInterval = 30; // every 30 seconds, refresh handle pool
|
||||||
|
|
||||||
|
char cacheName[128] = {0};
|
||||||
|
sprintf(cacheName, "qhandle_%d", vgId);
|
||||||
|
|
||||||
|
SQueryMgmt* pQueryMgmt = calloc(1, sizeof(SQueryMgmt));
|
||||||
|
if (pQueryMgmt == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshHandleInterval, true, freeqinfoFn, cacheName);
|
||||||
|
pQueryMgmt->closed = false;
|
||||||
|
pQueryMgmt->vgId = vgId;
|
||||||
|
|
||||||
|
pthread_mutex_init(&pQueryMgmt->lock, NULL);
|
||||||
|
|
||||||
|
qDebug("vgId:%d, open querymgmt success", vgId);
|
||||||
|
return pQueryMgmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void qQueryMgmtNotifyClosed(void* pQMgmt) {
|
||||||
|
if (pQMgmt == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryMgmt* pQueryMgmt = pQMgmt;
|
||||||
|
qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pQueryMgmt->lock);
|
||||||
|
pQueryMgmt->closed = true;
|
||||||
|
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||||
|
|
||||||
|
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn);
|
||||||
|
}
|
||||||
|
|
||||||
|
void qQueryMgmtReOpen(void *pQMgmt) {
|
||||||
|
if (pQMgmt == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryMgmt *pQueryMgmt = pQMgmt;
|
||||||
|
qDebug("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pQueryMgmt->lock);
|
||||||
|
pQueryMgmt->closed = false;
|
||||||
|
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void qCleanupQueryMgmt(void* pQMgmt) {
|
||||||
|
if (pQMgmt == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryMgmt* pQueryMgmt = pQMgmt;
|
||||||
|
int32_t vgId = pQueryMgmt->vgId;
|
||||||
|
|
||||||
|
assert(pQueryMgmt->closed);
|
||||||
|
|
||||||
|
SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
|
||||||
|
pQueryMgmt->qinfoPool = NULL;
|
||||||
|
|
||||||
|
taosCacheCleanup(pqinfoPool);
|
||||||
|
pthread_mutex_destroy(&pQueryMgmt->lock);
|
||||||
|
tfree(pQueryMgmt);
|
||||||
|
|
||||||
|
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
|
if (pMgmt == NULL) {
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||||
|
if (pQueryMgmt->qinfoPool == NULL) {
|
||||||
|
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pQueryMgmt->lock);
|
||||||
|
if (pQueryMgmt->closed) {
|
||||||
|
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||||
|
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
|
||||||
|
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
|
||||||
|
(getMaximumIdleDurationSec()*1000));
|
||||||
|
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void** qAcquireQInfo(void* pMgmt, uint64_t _key) {
|
||||||
|
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||||
|
|
||||||
|
if (pQueryMgmt->closed) {
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pQueryMgmt->qinfoPool == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSDB_CACHE_PTR_TYPE key = (TSDB_CACHE_PTR_TYPE)_key;
|
||||||
|
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||||
|
if (handle == NULL || *handle == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
|
||||||
|
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||||
|
if (pQueryMgmt->qinfoPool == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -125,7 +125,7 @@ void taosArrayRemove(SArray* pArray, size_t index);
|
||||||
* @param pDst
|
* @param pDst
|
||||||
* @param pSrc
|
* @param pSrc
|
||||||
*/
|
*/
|
||||||
void taosArrayCopy(SArray* pDst, const SArray* pSrc);
|
SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* clone a new array
|
* clone a new array
|
||||||
|
|
|
@ -156,23 +156,14 @@ void taosArrayRemove(SArray* pArray, size_t index) {
|
||||||
pArray->size -= 1;
|
pArray->size -= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayCopy(SArray* pDst, const SArray* pSrc) {
|
SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
|
||||||
assert(pSrc != NULL && pDst != NULL);
|
assert(src != NULL && elemSize > 0);
|
||||||
|
SArray* pDst = taosArrayInit(size, elemSize);
|
||||||
if (pDst->capacity < pSrc->size) {
|
|
||||||
void* pData = realloc(pDst->pData, pSrc->size * pSrc->elemSize);
|
memcpy(pDst->pData, src, elemSize * size);
|
||||||
if (pData == NULL) { // todo handle oom
|
pDst->size = size;
|
||||||
|
|
||||||
} else {
|
return pDst;
|
||||||
pDst->pData = pData;
|
|
||||||
pDst->capacity = pSrc->size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pDst->pData, pSrc->pData, pSrc->elemSize * pSrc->size);
|
|
||||||
pDst->elemSize = pSrc->elemSize;
|
|
||||||
pDst->capacity = pSrc->size;
|
|
||||||
pDst->size = pSrc->size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* taosArrayDup(const SArray* pSrc) {
|
SArray* taosArrayDup(const SArray* pSrc) {
|
||||||
|
|
|
@ -1276,6 +1276,7 @@ class Task():
|
||||||
0x510, # vnode not in ready state
|
0x510, # vnode not in ready state
|
||||||
0x14, # db not ready, errno changed
|
0x14, # db not ready, errno changed
|
||||||
0x600, # Invalid table ID, why?
|
0x600, # Invalid table ID, why?
|
||||||
|
0x218, # Table does not exist
|
||||||
1000 # REST catch-all error
|
1000 # REST catch-all error
|
||||||
]:
|
]:
|
||||||
return True # These are the ALWAYS-ACCEPTABLE ones
|
return True # These are the ALWAYS-ACCEPTABLE ones
|
||||||
|
|
|
@ -133,7 +133,7 @@ sleep 100
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
#sql alter table tb1 drop column c3
|
#sql alter table tb1 drop column c3
|
||||||
#sleep 2000
|
#sleep 500
|
||||||
#sql insert into tb1 values (now, 2, 'taos')
|
#sql insert into tb1 values (now, 2, 'taos')
|
||||||
#sleep 30000
|
#sleep 30000
|
||||||
#sql select * from strm
|
#sql select * from strm
|
||||||
|
@ -144,7 +144,7 @@ sleep 100
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
#sql alter table tb1 add column c3 int
|
#sql alter table tb1 add column c3 int
|
||||||
#sleep 2000
|
#sleep 500
|
||||||
#sql insert into tb1 values (now, 3, 'taos', 3);
|
#sql insert into tb1 values (now, 3, 'taos', 3);
|
||||||
#sleep 100
|
#sleep 100
|
||||||
#sql select * from strm
|
#sql select * from strm
|
||||||
|
|
|
@ -208,7 +208,7 @@ endi
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -105,7 +105,7 @@ run general/parser/col_arithmetic_query.sim
|
||||||
#======================================= all in files query =======================================
|
#======================================= all in files query =======================================
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
|
@ -82,7 +82,7 @@ endw
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 100
|
sleep 100
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
|
@ -77,7 +77,7 @@ run general/parser/first_last_query.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -25,15 +25,15 @@ sql use $db
|
||||||
sql create table tb (ts timestamp, c1 int, c2 timestamp)
|
sql create table tb (ts timestamp, c1 int, c2 timestamp)
|
||||||
sql insert into tb values ('2019-05-05 11:30:00.000', 1, now)
|
sql insert into tb values ('2019-05-05 11:30:00.000', 1, now)
|
||||||
sql insert into tb values ('2019-05-05 12:00:00.000', 1, now)
|
sql insert into tb values ('2019-05-05 12:00:00.000', 1, now)
|
||||||
sleep 2000
|
sleep 500
|
||||||
sql import into tb values ('2019-05-05 11:00:00.000', -1, now)
|
sql import into tb values ('2019-05-05 11:00:00.000', -1, now)
|
||||||
sleep 2000
|
sleep 500
|
||||||
sql import into tb values ('2019-05-05 11:59:00.000', -1, now)
|
sql import into tb values ('2019-05-05 11:59:00.000', -1, now)
|
||||||
sleep 2000
|
sleep 500
|
||||||
sql import into tb values ('2019-05-04 08:00:00.000', -1, now)
|
sql import into tb values ('2019-05-04 08:00:00.000', -1, now)
|
||||||
sleep 2000
|
sleep 500
|
||||||
sql import into tb values ('2019-05-04 07:59:00.000', -1, now)
|
sql import into tb values ('2019-05-04 07:59:00.000', -1, now)
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
sql select * from tb
|
sql select * from tb
|
||||||
if $rows != 6 then
|
if $rows != 6 then
|
||||||
|
@ -60,7 +60,7 @@ endi
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -40,7 +40,7 @@ while $x < $rowNum
|
||||||
endw
|
endw
|
||||||
print ====== tables created
|
print ====== tables created
|
||||||
|
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
$ts = $ts0 + $delta
|
$ts = $ts0 + $delta
|
||||||
$ts = $ts + 1
|
$ts = $ts + 1
|
||||||
|
|
|
@ -39,7 +39,7 @@ while $x < $rowNum
|
||||||
endw
|
endw
|
||||||
print ====== tables created
|
print ====== tables created
|
||||||
|
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
$ts = $ts0 + $delta
|
$ts = $ts0 + $delta
|
||||||
$ts = $ts + 1
|
$ts = $ts + 1
|
||||||
|
|
|
@ -39,7 +39,7 @@ while $x < $rowNum
|
||||||
endw
|
endw
|
||||||
print ====== tables created
|
print ====== tables created
|
||||||
|
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
$ts = $ts + 1
|
$ts = $ts + 1
|
||||||
sql insert into $tb values ( $ts , -1, -1, -1, -1, -1)
|
sql insert into $tb values ( $ts , -1, -1, -1, -1, -1)
|
||||||
|
@ -47,7 +47,7 @@ $ts = $ts0 + $delta
|
||||||
$ts = $ts + 1
|
$ts = $ts + 1
|
||||||
sql import into $tb values ( $ts , -2, -2, -2, -2, -2)
|
sql import into $tb values ( $ts , -2, -2, -2, -2, -2)
|
||||||
|
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
sql show databases
|
sql show databases
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,9 @@ system sh/stop_dnodes.sh
|
||||||
system sh/deploy.sh -n dnode1 -i 1
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 2000
|
sleep 500
|
||||||
sql connect
|
sql connect
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
sql drop database if exists indb
|
sql drop database if exists indb
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ system sh/stop_dnodes.sh
|
||||||
system sh/deploy.sh -n dnode1 -i 1
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 2000
|
sleep 500
|
||||||
sql connect
|
sql connect
|
||||||
sleep 100
|
sleep 100
|
||||||
print ======================== dnode1 start
|
print ======================== dnode1 start
|
||||||
|
|
|
@ -59,7 +59,7 @@ run general/parser/interp_test.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ run general/parser/lastrow_query.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -66,7 +66,7 @@ run general/parser/limit_stb.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -61,7 +61,7 @@ run general/parser/limit1_stb.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ run general/parser/limit1_stb.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ print ====== tables created
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
|
|
@ -143,6 +143,97 @@ if $data11 != -1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 8200
|
||||||
|
if $rows != 8200 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10 offset 8190;
|
||||||
|
if $rows != 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @18-10-15 19:30:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 5 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data10 != @18-10-15 19:35:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != -1000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data20 != @18-10-15 19:40:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 6 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data30 != @18-10-15 19:45:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != -1000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10 offset 10001;
|
||||||
|
if $rows != 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @18-10-22 02:25:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != -1000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data10 != @18-10-22 02:30:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data20 != @18-10-22 02:35:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != -1000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data30 != @18-10-22 02:40:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10000 offset 10001;
|
||||||
|
print ====> needs to validate the last row result
|
||||||
|
if $rows != 9998 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 100 offset 20001;
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
# tb + interval + fill(linear) + limit offset
|
# tb + interval + fill(linear) + limit offset
|
||||||
$limit = $rowNum
|
$limit = $rowNum
|
||||||
$offset = $limit / 2
|
$offset = $limit / 2
|
||||||
|
|
|
@ -59,7 +59,7 @@ sql show databases
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
@ -154,7 +154,7 @@ sql insert into t2 values('2020-1-1 1:5:1', 99);
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql select ts from m1 where ts='2020-1-1 1:5:1'
|
sql select ts from m1 where ts='2020-1-1 1:5:1'
|
||||||
|
|
|
@ -334,6 +334,9 @@ sql insert into tm0 values(10000, 1) (20000, 2)(30000, 3) (40000, NULL) (50000,
|
||||||
|
|
||||||
#=============================tbase-1205
|
#=============================tbase-1205
|
||||||
sql select count(*) from tm1 where ts<now and ts>= now -1d interval(1h) fill(NULL);
|
sql select count(*) from tm1 where ts<now and ts>= now -1d interval(1h) fill(NULL);
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
print ===================>TD-1834
|
print ===================>TD-1834
|
||||||
sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc
|
sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc
|
||||||
|
@ -409,7 +412,7 @@ sql_error select k, sum(k)+1 from tm0;
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
|
|
@ -118,7 +118,7 @@ endw
|
||||||
|
|
||||||
print ====== restart server to commit data into disk
|
print ====== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ====== server restart completed
|
print ====== server restart completed
|
||||||
sleep 100
|
sleep 100
|
||||||
|
|
|
@ -35,7 +35,7 @@ sql insert into $tb values ('2018-09-17 09:00:00.030', 3)
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -61,7 +61,7 @@ while $i < $tbNum
|
||||||
endw
|
endw
|
||||||
print ====== tables created
|
print ====== tables created
|
||||||
|
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
if $rows != $tbNum then
|
if $rows != $tbNum then
|
||||||
|
|
|
@ -32,7 +32,7 @@ run general/parser/single_row_in_tb_query.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,7 @@ run general/parser/slimit_query.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -56,7 +56,7 @@ run general/parser/slimit1_query.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -93,7 +93,7 @@ if $data02 != tb0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sleep 2000
|
sleep 500
|
||||||
sql reset query cache
|
sql reset query cache
|
||||||
sql select count(*), first(ts) from stb group by tg_added order by tg_added asc slimit 5 soffset 3
|
sql select count(*), first(ts) from stb group by tg_added order by tg_added asc slimit 5 soffset 3
|
||||||
if $rows != 5 then
|
if $rows != 5 then
|
||||||
|
@ -171,7 +171,7 @@ endi
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -67,7 +67,7 @@ run general/parser/tbnameIn_query.sim
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
|
|
|
@ -1,84 +1,84 @@
|
||||||
#run general/parser/alter.sim
|
run general/parser/alter.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/alter1.sim
|
run general/parser/alter1.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/alter_stable.sim
|
run general/parser/alter_stable.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/auto_create_tb.sim
|
run general/parser/auto_create_tb.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/auto_create_tb_drop_tb.sim
|
run general/parser/auto_create_tb_drop_tb.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/col_arithmetic_operation.sim
|
run general/parser/col_arithmetic_operation.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/columnValue.sim
|
run general/parser/columnValue.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/commit.sim
|
run general/parser/commit.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/create_db.sim
|
run general/parser/create_db.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/create_mt.sim
|
run general/parser/create_mt.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/create_tb.sim
|
run general/parser/create_tb.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/dbtbnameValidate.sim
|
run general/parser/dbtbnameValidate.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/fill.sim
|
run general/parser/fill.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/fill_stb.sim
|
run general/parser/fill_stb.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
##run general/parser/fill_us.sim #
|
#run general/parser/fill_us.sim #
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/first_last.sim
|
run general/parser/first_last.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/import_commit1.sim
|
run general/parser/import_commit1.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/import_commit2.sim
|
run general/parser/import_commit2.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/import_commit3.sim
|
run general/parser/import_commit3.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
##run general/parser/import_file.sim
|
#run general/parser/import_file.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/insert_tb.sim
|
run general/parser/insert_tb.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/tags_dynamically_specifiy.sim
|
run general/parser/tags_dynamically_specifiy.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/interp.sim
|
run general/parser/interp.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/lastrow.sim
|
run general/parser/lastrow.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/limit.sim
|
run general/parser/limit.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/limit1.sim
|
run general/parser/limit1.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/limit1_tblocks100.sim
|
run general/parser/limit1_tblocks100.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/limit2.sim
|
run general/parser/limit2.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/mixed_blocks.sim
|
run general/parser/mixed_blocks.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/nchar.sim
|
run general/parser/nchar.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/null_char.sim
|
run general/parser/null_char.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/selectResNum.sim
|
run general/parser/selectResNum.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/select_across_vnodes.sim
|
run general/parser/select_across_vnodes.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/select_from_cache_disk.sim
|
run general/parser/select_from_cache_disk.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/set_tag_vals.sim
|
run general/parser/set_tag_vals.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/single_row_in_tb.sim
|
run general/parser/single_row_in_tb.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/slimit.sim
|
run general/parser/slimit.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/slimit1.sim
|
run general/parser/slimit1.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/slimit_alter_tags.sim
|
run general/parser/slimit_alter_tags.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/tbnameIn.sim
|
run general/parser/tbnameIn.sim
|
||||||
#sleep 100
|
sleep 100
|
||||||
#run general/parser/slimit_alter_tags.sim # persistent failed
|
run general/parser/slimit_alter_tags.sim # persistent failed
|
||||||
sleep 100
|
sleep 100
|
||||||
run general/parser/join.sim
|
run general/parser/join.sim
|
||||||
sleep 100
|
sleep 100
|
||||||
|
|
|
@ -128,7 +128,7 @@ sql insert into test values(29999, 1)(70000, 2)(80000, 3)
|
||||||
|
|
||||||
print ================== restart server to commit data into disk
|
print ================== restart server to commit data into disk
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
|
|
|
@ -324,7 +324,7 @@ while $i < 1
|
||||||
endw
|
endw
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 2000
|
sleep 500
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue