commit
1e3e40f6ea
|
@ -210,7 +210,7 @@ typedef struct SQueryInfo {
|
|||
int16_t numOfTables;
|
||||
STableMetaInfo **pTableMetaInfo;
|
||||
struct STSBuf * tsBuf;
|
||||
int64_t * defaultVal; // default value for interpolation
|
||||
int64_t * fillVal; // default value for interpolation
|
||||
char * msg; // pointer to the pCmd->payload to keep error message temporarily
|
||||
int64_t clauseLimit; // limit for current sub clause
|
||||
|
||||
|
|
|
@ -390,7 +390,11 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
|
|||
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||
tscTrace("no result generated, result is set to NULL");
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
}
|
||||
}
|
||||
|
||||
doFinalizer(pCtx);
|
||||
|
@ -1864,12 +1868,22 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
|
|||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -2885,7 +2899,12 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
|
|||
SLeastsquareInfo *pInfo = pResInfo->interResultBuf;
|
||||
|
||||
if (pInfo->num == 0) {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -3139,7 +3158,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
*pOutput = pData[i] - pCtx->param[1].i64Key;
|
||||
*pOutput = pData[i] - pCtx->param[1].dKey;
|
||||
*pTimestamp = pCtx->ptsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
|
@ -3170,7 +3189,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
*pOutput = pData[i] - pCtx->param[1].i64Key;
|
||||
*pOutput = pData[i] - pCtx->param[1].dKey;
|
||||
*pTimestamp = pCtx->ptsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
|
@ -3862,7 +3881,11 @@ static void interp_function(SQLFunctionCtx *pCtx) {
|
|||
*(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts;
|
||||
} else {
|
||||
if (pInfoDetail->type == TSDB_FILL_NULL) {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
}
|
||||
} else if (pInfoDetail->type == TSDB_FILL_SET_VALUE) {
|
||||
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType);
|
||||
} else if (pInfoDetail->type == TSDB_FILL_PREV) {
|
||||
|
@ -3914,7 +3937,11 @@ static void interp_function(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
||||
if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->inputBytes);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -312,8 +312,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
case TSDB_DATA_TYPE_BINARY:
|
||||
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
|
||||
if (pToken->type == TK_NULL) {
|
||||
varDataSetLen(payload, sizeof(int8_t));
|
||||
*(uint8_t*) varDataVal(payload) = TSDB_DATA_BINARY_NULL;
|
||||
setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
|
||||
} else { // too long values will return invalid sql, not be truncated automatically
|
||||
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
|
||||
return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
|
||||
|
@ -326,8 +325,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (pToken->type == TK_NULL) {
|
||||
varDataSetLen(payload, sizeof(int32_t));
|
||||
*(uint32_t*) varDataVal(payload) = TSDB_DATA_NCHAR_NULL;
|
||||
setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
|
||||
} else {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
size_t output = 0;
|
||||
|
|
|
@ -4015,9 +4015,9 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
|
|||
|
||||
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
|
||||
if (pQueryInfo->defaultVal == NULL) {
|
||||
pQueryInfo->defaultVal = calloc(size, sizeof(int64_t));
|
||||
if (pQueryInfo->defaultVal == NULL) {
|
||||
if (pQueryInfo->fillVal == NULL) {
|
||||
pQueryInfo->fillVal = calloc(size, sizeof(int64_t));
|
||||
if (pQueryInfo->fillVal == NULL) {
|
||||
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -4028,7 +4028,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
|
|||
pQueryInfo->fillType = TSDB_FILL_NULL;
|
||||
for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) {
|
||||
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
|
||||
setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes);
|
||||
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
|
||||
} else {
|
||||
setNull((char*)&pQueryInfo->fillVal[i], pFields->type, pFields->bytes);
|
||||
};
|
||||
}
|
||||
} else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) {
|
||||
pQueryInfo->fillType = TSDB_FILL_PREV;
|
||||
|
@ -4061,11 +4065,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
|
|||
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
|
||||
|
||||
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes);
|
||||
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type);
|
||||
int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->fillVal[i], pFields->type);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg);
|
||||
}
|
||||
|
@ -4079,9 +4083,9 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
|
|||
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
|
||||
|
||||
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes);
|
||||
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
|
||||
} else {
|
||||
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type);
|
||||
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pFields->type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,7 +145,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
|
|||
pFillCol[i].flag = pExpr->colInfo.flag;
|
||||
pFillCol[i].col.offset = offset;
|
||||
pFillCol[i].functionId = pExpr->functionId;
|
||||
pFillCol[i].defaultVal.i = pQueryInfo->defaultVal[i];
|
||||
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
|
||||
offset += pExpr->resBytes;
|
||||
}
|
||||
|
||||
|
@ -946,8 +946,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
|
|||
}
|
||||
|
||||
while (1) {
|
||||
int64_t newRows = -1;
|
||||
taosGenerateDataBlock(pFillInfo, pResPages, &newRows, pLocalReducer->resColModel->capacity);
|
||||
int64_t newRows = taosGenerateDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity);
|
||||
|
||||
if (pQueryInfo->limit.offset < newRows) {
|
||||
newRows -= pQueryInfo->limit.offset;
|
||||
|
|
|
@ -781,8 +781,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
|
||||
*((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
|
||||
pMsg += sizeof(pQueryInfo->defaultVal[0]);
|
||||
*((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
|
||||
pMsg += sizeof(pQueryInfo->fillVal[0]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -223,7 +223,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
|||
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
|
||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
|
||||
|
||||
assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->defaultVal[i]), pField->bytes, pField->type);
|
||||
assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type);
|
||||
row[i] = pSql->res.data + offset;
|
||||
}
|
||||
|
||||
|
|
|
@ -281,7 +281,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
pQueryInfo->fillType = TSDB_FILL_NONE;
|
||||
tfree(pQueryInfo->defaultVal);
|
||||
tfree(pQueryInfo->fillVal);
|
||||
}
|
||||
|
||||
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||
|
@ -1616,7 +1616,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
|
|||
|
||||
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
|
||||
|
||||
tfree(pQueryInfo->defaultVal);
|
||||
tfree(pQueryInfo->fillVal);
|
||||
}
|
||||
|
||||
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
|
||||
|
@ -1768,7 +1768,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
pNewQueryInfo->order = pQueryInfo->order;
|
||||
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
|
||||
pNewQueryInfo->pTableMetaInfo = NULL;
|
||||
pNewQueryInfo->defaultVal = NULL;
|
||||
pNewQueryInfo->fillVal = NULL;
|
||||
pNewQueryInfo->numOfTables = 0;
|
||||
pNewQueryInfo->tsBuf = NULL;
|
||||
|
||||
|
@ -1780,8 +1780,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
|
||||
|
||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||
pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
|
||||
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
|
||||
pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
|
||||
memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
|
||||
}
|
||||
|
||||
if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -381,6 +381,18 @@ bool isNull(const char *val, int32_t type) {
|
|||
};
|
||||
}
|
||||
|
||||
void setVardataNull(char* val, int32_t type) {
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
varDataSetLen(val, sizeof(int8_t));
|
||||
*(uint8_t*) varDataVal(val) = TSDB_DATA_BINARY_NULL;
|
||||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||
varDataSetLen(val, sizeof(int32_t));
|
||||
*(uint32_t*) varDataVal(val) = TSDB_DATA_NCHAR_NULL;
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
|
||||
|
||||
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
|
||||
|
@ -483,7 +495,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
|
|||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
wcsncpy((wchar_t*)val, (wchar_t*)src, len / TSDB_NCHAR_SIZE);
|
||||
varDataCopy(val, src);
|
||||
break;
|
||||
};
|
||||
default: {
|
||||
|
|
|
@ -157,6 +157,7 @@ extern tDataTypeDescriptor tDataTypeDesc[11];
|
|||
bool isValidDataType(int32_t type, int32_t length);
|
||||
bool isNull(const char *val, int32_t type);
|
||||
|
||||
void setVardataNull(char* val, int32_t type);
|
||||
void setNull(char *val, int32_t type, int32_t bytes);
|
||||
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
|
||||
|
||||
|
|
|
@ -443,7 +443,7 @@ typedef struct {
|
|||
int16_t numOfOutput; // final output columns numbers
|
||||
int16_t tagNameRelType; // relation of tag criteria and tbname criteria
|
||||
int16_t fillType; // interpolate type
|
||||
uint64_t defaultVal; // default value array list
|
||||
uint64_t fillVal; // default value array list
|
||||
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
|
||||
int32_t tsLen; // total length of ts comp block
|
||||
int32_t tsNumOfBlocks; // ts comp block numbers
|
||||
|
|
|
@ -138,7 +138,7 @@ typedef struct SQuery {
|
|||
SColumnInfo* colList;
|
||||
SColumnInfo* tagColList;
|
||||
int32_t numOfFilterCols;
|
||||
int64_t* defaultVal;
|
||||
int64_t* fillVal;
|
||||
uint32_t status; // query status
|
||||
SResultRec rec;
|
||||
int32_t pos;
|
||||
|
|
|
@ -28,7 +28,7 @@ typedef struct {
|
|||
STColumn col; // column info
|
||||
int16_t functionId; // sql function id
|
||||
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
|
||||
union {int64_t i; double d;} defaultVal;
|
||||
union {int64_t i; double d;} fillVal;
|
||||
} SFillColInfo;
|
||||
|
||||
typedef struct SFillInfo {
|
||||
|
@ -75,15 +75,13 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu
|
|||
|
||||
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision);
|
||||
|
||||
int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows);
|
||||
int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows);
|
||||
|
||||
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
|
||||
|
||||
int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData);
|
||||
|
||||
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
|
||||
|
||||
void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity);
|
||||
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -376,11 +376,16 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
|
|||
|
||||
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
|
||||
|
||||
static bool limitResults(SQuery *pQuery) {
|
||||
static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
|
||||
pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total;
|
||||
assert(pQuery->rec.rows > 0);
|
||||
|
||||
|
||||
qTrace("QInfo:%p discard remain data due to result limitation, limit:%"PRId64", current return:%d, total:%"PRId64,
|
||||
pQInfo, pQuery->limit.limit, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
|
||||
assert(pQuery->rec.rows >= 0);
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
return true;
|
||||
}
|
||||
|
@ -624,15 +629,17 @@ static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t sea
|
|||
/**
|
||||
* NOTE: the query status only set for the first scan of master scan.
|
||||
*/
|
||||
static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
|
||||
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!isIntervalQuery(pQuery))) {
|
||||
return;
|
||||
return pWindowResInfo->size;
|
||||
}
|
||||
|
||||
// no qualified results exist, abort check
|
||||
int32_t numOfClosed = 0;
|
||||
|
||||
if (pWindowResInfo->size == 0) {
|
||||
return;
|
||||
return pWindowResInfo->size;
|
||||
}
|
||||
|
||||
// query completed
|
||||
|
@ -646,10 +653,10 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
|
|||
int32_t i = 0;
|
||||
int64_t skey = TSKEY_INITIAL_VAL;
|
||||
|
||||
// TODO opt performance: get the closed time window here
|
||||
for (i = 0; i < pWindowResInfo->size; ++i) {
|
||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||
if (pResult->status.closed) {
|
||||
numOfClosed += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -672,16 +679,26 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
|
|||
|
||||
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey;
|
||||
|
||||
// the number of completed slots are larger than the threshold, dump to client immediately.
|
||||
int32_t n = numOfClosedTimeWindow(pWindowResInfo);
|
||||
if (n > pWindowResInfo->threshold) {
|
||||
// the number of completed slots are larger than the threshold, return current generated results to client.
|
||||
if (numOfClosed > pWindowResInfo->threshold) {
|
||||
qTrace("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return",
|
||||
GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold);
|
||||
|
||||
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
||||
} else {
|
||||
qTrace("QInfo:%p total result window:%d already closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size,
|
||||
numOfClosed);
|
||||
}
|
||||
|
||||
qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, n);
|
||||
}
|
||||
|
||||
|
||||
// output has reached the limitation, set query completed
|
||||
if (pQuery->limit.limit > 0 && (pQuery->limit.limit + pQuery->limit.offset) <= numOfClosed &&
|
||||
pRuntimeEnv->scanFlag == MASTER_SCAN) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
}
|
||||
|
||||
assert(pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL);
|
||||
return numOfClosed;
|
||||
}
|
||||
|
||||
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
|
||||
|
@ -1309,28 +1326,27 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
|
||||
pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
|
||||
|
||||
// interval query with limit applied
|
||||
if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 &&
|
||||
(pQuery->limit.limit + pQuery->limit.offset) <= numOfClosedTimeWindow(pWindowResInfo) &&
|
||||
pRuntimeEnv->scanFlag == MASTER_SCAN) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
}
|
||||
int32_t numOfRes = 0;
|
||||
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
|
||||
} else {
|
||||
numOfRes = getNumOfResult(pRuntimeEnv);
|
||||
|
||||
int32_t numOfRes = getNumOfResult(pRuntimeEnv);
|
||||
// update the number of output result
|
||||
if (numOfRes > 0 && pQuery->checkBuffer == 1) {
|
||||
assert(numOfRes >= pQuery->rec.rows);
|
||||
pQuery->rec.rows = numOfRes;
|
||||
|
||||
// update the number of output result
|
||||
if (numOfRes > 0 && pQuery->checkBuffer == 1) {
|
||||
assert(numOfRes >= pQuery->rec.rows);
|
||||
pQuery->rec.rows = numOfRes;
|
||||
if (numOfRes >= pQuery->rec.threshold) {
|
||||
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
||||
}
|
||||
|
||||
if (numOfRes >= pQuery->rec.threshold) {
|
||||
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
||||
}
|
||||
|
||||
if ((pQuery->limit.limit >= 0) && numOfRes >= (pQuery->limit.limit + pQuery->limit.offset)) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
if ((pQuery->limit.limit >= 0) && (pQuery->limit.limit + pQuery->limit.offset) <= numOfRes) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2026,10 +2042,10 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
|
|||
|
||||
tVariantCreateFromBinary(&pCtx->param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT);
|
||||
|
||||
if (isNull((char *)&pQuery->defaultVal[i], pCtx->inputType)) {
|
||||
if (isNull((char *)&pQuery->fillVal[i], pCtx->inputType)) {
|
||||
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
|
||||
} else {
|
||||
tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->defaultVal[i], pCtx->inputBytes, pCtx->inputType);
|
||||
tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType);
|
||||
}
|
||||
|
||||
pInterpDetail->ts = pQuery->window.skey;
|
||||
|
@ -2471,8 +2487,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
|
||||
|
||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
||||
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
|
||||
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey);
|
||||
|
||||
// while the output buffer is full or limit/offset is applied, query may be paused here
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL | QUERY_COMPLETED)) {
|
||||
|
@ -3374,7 +3390,9 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
|
|||
// during reverse scan
|
||||
pTableQueryInfo->lastKey = pStatus->lastKey;
|
||||
pQuery->status = pStatus->status;
|
||||
|
||||
pTableQueryInfo->win = pStatus->w;
|
||||
pQuery->window = pTableQueryInfo->win;
|
||||
}
|
||||
|
||||
void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||
|
@ -3396,6 +3414,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
|||
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
|
||||
qstatus.status = pQuery->status;
|
||||
qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step;
|
||||
qstatus.lastKey = pTableQueryInfo->lastKey;
|
||||
}
|
||||
|
||||
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
|
||||
|
@ -3423,6 +3442,9 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
|||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
pRuntimeEnv->scanFlag = REPEAT_SCAN;
|
||||
|
||||
qTrace("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo,
|
||||
cond.twindow.skey, cond.twindow.ekey);
|
||||
|
||||
// check if query is killed or not
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
|
@ -3707,7 +3729,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
|
|||
int32_t startIdx = 0;
|
||||
int32_t step = -1;
|
||||
|
||||
qTrace("QInfo:%p start to copy data from windowResInfo to query buf", GET_QINFO_ADDR(pQuery));
|
||||
qTrace("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
|
||||
int32_t totalSubset = getNumOfSubset(pQInfo);
|
||||
|
||||
if (orderType == TSDB_ORDER_ASC) {
|
||||
|
@ -3836,7 +3858,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
|||
}
|
||||
|
||||
/*
|
||||
* There are no results returned to client now.
|
||||
* While the code reaches here, there are no results returned to client now.
|
||||
* If query is not completed yet, the gaps between two results blocks need to be handled after next data block
|
||||
* is retrieved from TSDB.
|
||||
*
|
||||
|
@ -3890,18 +3912,24 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
|||
}
|
||||
|
||||
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) {
|
||||
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
|
||||
|
||||
while (1) {
|
||||
taosGenerateDataBlock(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata, &pQuery->rec.rows, pQuery->rec.capacity);
|
||||
int32_t ret = pQuery->rec.rows;
|
||||
int32_t ret = taosGenerateDataBlock(pFillInfo, (tFilePage**) pQuery->sdata, pQuery->rec.capacity);
|
||||
|
||||
// todo apply limit output function
|
||||
/* reached the start position of according to offset value, return immediately */
|
||||
if (pQuery->limit.offset == 0) {
|
||||
qTrace("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pQInfo, pFillInfo->numOfRows, ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (pQuery->limit.offset < ret) {
|
||||
qTrace("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%d. Discard due to offset, remain:%d, new offset:%d",
|
||||
pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0);
|
||||
|
||||
ret -= pQuery->limit.offset;
|
||||
// todo !!!!there exactly number of interpo is not valid.
|
||||
// todo refactor move to the beginning of buffer
|
||||
|
@ -3909,10 +3937,16 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
|
|||
memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].bytes * pQuery->limit.offset,
|
||||
ret * pQuery->pSelectExpr[i].bytes);
|
||||
}
|
||||
|
||||
pQuery->limit.offset = 0;
|
||||
return ret;
|
||||
} else {
|
||||
qTrace("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%d. Discard due to offset, "
|
||||
"remain:%d, new offset:%d", pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0,
|
||||
pQuery->limit.offset - ret);
|
||||
|
||||
pQuery->limit.offset -= ret;
|
||||
pQuery->rec.rows = 0;
|
||||
ret = 0;
|
||||
}
|
||||
|
||||
|
@ -3920,8 +3954,6 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
|
|||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodePrintQueryStatistics(SQInfo *pQInfo) {
|
||||
|
@ -4002,8 +4034,8 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
|
|||
|
||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
|
||||
|
||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes);
|
||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
|
||||
}
|
||||
|
||||
void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
|
@ -4120,8 +4152,9 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
|||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock);
|
||||
pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index
|
||||
|
||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d",
|
||||
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64,
|
||||
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey);
|
||||
|
||||
return true;
|
||||
} else { // do nothing
|
||||
*start = tw.skey;
|
||||
|
@ -4215,7 +4248,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
|
|||
pFillCol[i].col.offset = offset;
|
||||
pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query
|
||||
pFillCol[i].functionId = pExprInfo->base.functionId;
|
||||
pFillCol[i].defaultVal.i = pQuery->defaultVal[i];
|
||||
pFillCol[i].fillVal.i = pQuery->fillVal[i];
|
||||
|
||||
offset += pExprInfo->bytes;
|
||||
}
|
||||
|
@ -4591,7 +4624,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
skipResults(pRuntimeEnv);
|
||||
|
||||
// the limitation of output result is reached, set the query completed
|
||||
if (limitResults(pQuery)) {
|
||||
if (limitResults(pRuntimeEnv)) {
|
||||
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
|
||||
break;
|
||||
}
|
||||
|
@ -4846,7 +4879,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
|||
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
||||
|
||||
skipResults(pRuntimeEnv);
|
||||
limitResults(pQuery);
|
||||
limitResults(pRuntimeEnv);
|
||||
}
|
||||
|
||||
static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||
|
@ -4894,7 +4927,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
|||
resetCtxOutputBuf(pRuntimeEnv);
|
||||
}
|
||||
|
||||
limitResults(pQuery);
|
||||
limitResults(pRuntimeEnv);
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
|
||||
pQuery->current->lastKey, pQuery->window.ekey);
|
||||
|
@ -4972,7 +5005,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
|||
|
||||
// the offset is handled at prepare stage if no interpolation involved
|
||||
if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) {
|
||||
limitResults(pQuery);
|
||||
limitResults(pRuntimeEnv);
|
||||
break;
|
||||
} else {
|
||||
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
|
||||
|
@ -4980,11 +5013,10 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
|||
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey);
|
||||
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
|
||||
numOfInterpo = 0;
|
||||
|
||||
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo);
|
||||
|
||||
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
|
||||
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
limitResults(pQuery);
|
||||
limitResults(pRuntimeEnv);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -5017,9 +5049,8 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo);
|
||||
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo);
|
||||
|
||||
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
|
||||
if (pQuery->rec.rows > 0) {
|
||||
limitResults(pQuery);
|
||||
limitResults(pRuntimeEnv);
|
||||
}
|
||||
|
||||
qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||
|
@ -5351,7 +5382,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
|
||||
pQueryMsg->fillType = htons(pQueryMsg->fillType);
|
||||
if (pQueryMsg->fillType != TSDB_FILL_NONE) {
|
||||
pQueryMsg->defaultVal = (uint64_t)(pMsg);
|
||||
pQueryMsg->fillVal = (uint64_t)(pMsg);
|
||||
|
||||
int64_t *v = (int64_t *)pMsg;
|
||||
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
|
||||
|
@ -5722,13 +5753,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
}
|
||||
|
||||
if (pQuery->fillType != TSDB_FILL_NONE) {
|
||||
pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutput);
|
||||
if (pQuery->defaultVal == NULL) {
|
||||
pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput);
|
||||
if (pQuery->fillVal == NULL) {
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
// the first column is the timestamp
|
||||
memcpy(pQuery->defaultVal, (char *)pQueryMsg->defaultVal, pQuery->numOfOutput * sizeof(int64_t));
|
||||
memcpy(pQuery->fillVal, (char *)pQueryMsg->fillVal, pQuery->numOfOutput * sizeof(int64_t));
|
||||
}
|
||||
|
||||
// to make sure third party won't overwrite this structure
|
||||
|
@ -5785,7 +5816,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
return pQInfo;
|
||||
|
||||
_cleanup:
|
||||
tfree(pQuery->defaultVal);
|
||||
tfree(pQuery->fillVal);
|
||||
|
||||
if (pQuery->sdata != NULL) {
|
||||
for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
|
||||
|
@ -5893,8 +5924,8 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|||
tfree(pQuery->pSelectExpr);
|
||||
}
|
||||
|
||||
if (pQuery->defaultVal != NULL) {
|
||||
tfree(pQuery->defaultVal);
|
||||
if (pQuery->fillVal != NULL) {
|
||||
tfree(pQuery->fillVal);
|
||||
}
|
||||
|
||||
// todo refactor, extract method to destroytableDataInfo
|
||||
|
@ -5997,8 +6028,13 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
|||
}
|
||||
|
||||
pQuery->rec.total += pQuery->rec.rows;
|
||||
qTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||
qTrace("QInfo:%p current numOfRes rows:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||
|
||||
if (pQuery->limit.limit > 0 && pQuery->limit.limit == pQuery->rec.total) {
|
||||
qTrace("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit);
|
||||
setQueryStatus(pQuery, QUERY_OVER);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
// todo if interpolation exists, the result may be dump to client by several rounds
|
||||
|
|
|
@ -34,7 +34,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch
|
|||
* here we revised the start time of day according to the local time zone,
|
||||
* but in case of DST, the start time of one day need to be dynamically decided.
|
||||
*
|
||||
* TODO dynamically decide the start time of a day
|
||||
* TODO dynamically decide the start time of a day, move to common module
|
||||
*/
|
||||
|
||||
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
||||
|
@ -116,10 +116,9 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
|||
return;
|
||||
}
|
||||
|
||||
pFillInfo->rowIdx = 0;
|
||||
pFillInfo->rowIdx = 0;
|
||||
pFillInfo->endKey = endKey;
|
||||
pFillInfo->numOfRows = numOfRows;
|
||||
|
||||
pFillInfo->endKey = endKey;
|
||||
}
|
||||
|
||||
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) {
|
||||
|
@ -131,6 +130,8 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput)
|
|||
|
||||
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) {
|
||||
assert(pFillInfo->numOfRows == pInput->num);
|
||||
int32_t t = 0;
|
||||
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
|
@ -138,7 +139,7 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu
|
|||
memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes);
|
||||
|
||||
if (pCol->flag == TSDB_COL_TAG) { // copy the tag value
|
||||
memcpy(pFillInfo->pTags[i], pFillInfo->pData[i], pCol->col.bytes);
|
||||
memcpy(pFillInfo->pTags[t++], pFillInfo->pData[i], pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -170,7 +171,7 @@ static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsA
|
|||
}
|
||||
}
|
||||
|
||||
int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) {
|
||||
int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) {
|
||||
int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows,
|
||||
pFillInfo->slidingTime, ekey);
|
||||
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
|
||||
|
@ -193,7 +194,7 @@ static double linearInterpolationImpl(double v1, double v2, double k1, double k2
|
|||
int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
*(int32_t*)point->val = linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
|
||||
*(int32_t*)point->val = (int32_t) linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
|
||||
point2->key, point->key);
|
||||
break;
|
||||
}
|
||||
|
@ -209,17 +210,17 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
|
|||
};
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
*(int64_t*)point->val = linearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
|
||||
*(int64_t*)point->val = (int64_t) linearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
|
||||
point2->key, point->key);
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
*(int16_t*)point->val = linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
|
||||
*(int16_t*)point->val = (int16_t) linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
|
||||
point2->key, point->key);
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
*(int8_t*)point->val =
|
||||
*(int8_t*) point->val = (int8_t)
|
||||
linearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key);
|
||||
break;
|
||||
};
|
||||
|
@ -243,8 +244,8 @@ static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, in
|
|||
|
||||
static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData,
|
||||
int64_t ts, char** pTags, bool outOfBound) {
|
||||
char** prevValues = &pFillInfo->prevValues;
|
||||
char** nextValues = &pFillInfo->nextValues;
|
||||
char* prevValues = pFillInfo->prevValues;
|
||||
char* nextValues = pFillInfo->nextValues;
|
||||
|
||||
SPoint point1, point2, point;
|
||||
|
||||
|
@ -257,16 +258,21 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
|
|||
|
||||
// set the other values
|
||||
if (pFillInfo->fillType == TSDB_FILL_PREV) {
|
||||
char* pInterpolationData = FILL_IS_ASC_FILL(pFillInfo) ? *prevValues : *nextValues;
|
||||
if (pInterpolationData != NULL) {
|
||||
char* p = FILL_IS_ASC_FILL(pFillInfo) ? prevValues : nextValues;
|
||||
|
||||
if (p != NULL) {
|
||||
for (int32_t i = 1; i < numOfValCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
if (isNull(pInterpolationData + pCol->col.offset, pCol->col.type)) {
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
if (isNull(p + pCol->col.offset, pCol->col.type)) {
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY || pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
} else {
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
} else {
|
||||
assignVal(val1, pInterpolationData + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
||||
assignVal(val1, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
}
|
||||
} else { // no prev value yet, set the value for NULL
|
||||
|
@ -274,14 +280,18 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
|
|||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY||pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
} else {
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
|
||||
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
|
||||
// TODO : linear interpolation supports NULL value
|
||||
if (*prevValues != NULL && !outOfBound) {
|
||||
if (prevValues != NULL && !outOfBound) {
|
||||
for (int32_t i = 1; i < numOfValCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
|
@ -289,14 +299,17 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
|
|||
int16_t bytes = pCol->col.bytes;
|
||||
|
||||
char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
|
||||
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
continue;
|
||||
} else if (type == TSDB_DATA_TYPE_BOOL) {
|
||||
setNull(val1, pCol->col.type, bytes);
|
||||
continue;
|
||||
}
|
||||
|
||||
point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + pCol->col.offset};
|
||||
point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset};
|
||||
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
|
||||
point = (SPoint){.key = pFillInfo->start, .val = val1};
|
||||
point = (SPoint){.key = pFillInfo->start, .val = val1};
|
||||
taosDoLinearInterpolation(type, &point1, &point2, &point);
|
||||
}
|
||||
|
||||
|
@ -307,7 +320,12 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
|
|||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY || pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
} else {
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
|
||||
|
@ -318,7 +336,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
|
|||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
assignVal(val1, (char*)&pCol->defaultVal.i, pCol->col.bytes, pCol->col.type);
|
||||
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
|
||||
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
|
||||
|
@ -338,11 +356,16 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** nextValues) {
|
|||
*nextValues = calloc(1, pFillInfo->rowSize);
|
||||
for (int i = 1; i < pFillInfo->numOfCols; i++) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes);
|
||||
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY||pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(*nextValues + pCol->col.offset, pCol->col.type);
|
||||
} else {
|
||||
setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) {
|
||||
int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) {
|
||||
int32_t num = 0;
|
||||
pFillInfo->numOfCurrent = 0;
|
||||
|
||||
|
@ -356,8 +379,8 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO
|
|||
|
||||
if (numOfRows == 0) {
|
||||
/*
|
||||
* we need to rebuild whole result set
|
||||
* NOTE:we need to keep the last saved data, to generated the filled data
|
||||
* These data are generated according to fill strategy, since the current timestamp is out of time window of
|
||||
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
|
||||
*/
|
||||
while (num < outputRows) {
|
||||
doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true);
|
||||
|
@ -387,7 +410,7 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO
|
|||
|
||||
while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
|
||||
doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, false);
|
||||
doInterpoResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false);
|
||||
}
|
||||
|
||||
/* output buffer is full, abort */
|
||||
|
@ -420,7 +443,7 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO
|
|||
assignVal(val1, src, pCol->col.bytes, pCol->col.type);
|
||||
memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes);
|
||||
} else {
|
||||
assignVal(val1, (char*) &pCol->defaultVal.i, pCol->col.bytes, pCol->col.type);
|
||||
assignVal(val1, (char*) &pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -450,21 +473,12 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO
|
|||
}
|
||||
}
|
||||
|
||||
void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity) {
|
||||
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
|
||||
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
|
||||
|
||||
// TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
|
||||
// pQuery->slidingTimeUnit, pQuery->precision);
|
||||
// if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
// assert(ekey >= pQuery->window.ekey);
|
||||
// } else {
|
||||
// assert(ekey <= pQuery->window.ekey);
|
||||
// }
|
||||
|
||||
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
|
||||
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
|
||||
|
||||
int32_t numOfRes = taosDoInterpoResult(pFillInfo, output, remain, rows, pFillInfo->pData);
|
||||
*outputRows = rows;
|
||||
|
||||
int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData);
|
||||
assert(numOfRes == rows);
|
||||
|
||||
return numOfRes;
|
||||
}
|
||||
|
|
|
@ -101,11 +101,12 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
|
||||
pVar->nLen = len / TSDB_NCHAR_SIZE;
|
||||
pVar->wpz = calloc(1, (pVar->nLen + 1) * TSDB_NCHAR_SIZE);
|
||||
int32_t lenInwchar = len / TSDB_NCHAR_SIZE;
|
||||
pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
|
||||
|
||||
wcsncpy(pVar->wpz, (wchar_t *)pz, pVar->nLen);
|
||||
pVar->wpz[pVar->nLen] = 0;
|
||||
wcsncpy(pVar->wpz, (wchar_t *)pz, lenInwchar);
|
||||
pVar->wpz[lenInwchar] = 0;
|
||||
pVar->nLen = len;
|
||||
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1539,14 +1539,21 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
|
|||
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
|
||||
|
||||
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
|
||||
memset(pHandle->statis, 0, sizeof(SDataStatis) * numOfCols);
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SDataStatis* st = &pHandle->statis[i];
|
||||
int32_t colId = st->colId;
|
||||
|
||||
memset(st, 0, sizeof(SDataStatis));
|
||||
st->colId = colId;
|
||||
}
|
||||
|
||||
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
|
||||
|
||||
*pBlockStatis = pHandle->statis;
|
||||
|
||||
//update the number of NULL data rows
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
if (pHandle->statis[i].numOfNull == -1) {
|
||||
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
|
||||
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ if $data01 != 2 then
|
|||
return -1
|
||||
endi
|
||||
if $data02 != tb2 then
|
||||
print expect tb2, actual: $data02
|
||||
return -1
|
||||
endi
|
||||
if $data03 != tb2 then
|
||||
|
|
Loading…
Reference in New Issue