[td-225] refactor code.
This commit is contained in:
parent
c07c7ca74f
commit
ed458e104b
|
@ -364,7 +364,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
|
||||||
|
|
||||||
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
|
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
|
||||||
int64_t revisedSTime =
|
int64_t revisedSTime =
|
||||||
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
||||||
|
|
||||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||||
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
|
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
|
||||||
|
@ -831,7 +831,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
|
||||||
if (pFillInfo != NULL) {
|
if (pFillInfo != NULL) {
|
||||||
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
||||||
int64_t revisedSTime =
|
int64_t revisedSTime =
|
||||||
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
||||||
|
|
||||||
taosResetFillInfo(pFillInfo, revisedSTime);
|
taosResetFillInfo(pFillInfo, revisedSTime);
|
||||||
}
|
}
|
||||||
|
@ -1301,9 +1301,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
|
||||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||||
TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
|
TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
|
||||||
int64_t newTime =
|
int64_t newTime =
|
||||||
taosGetIntervalStartTimestamp(skey, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
|
taosGetIntervalStartTimestamp(skey, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
|
||||||
// taosResetFillInfo(pLocalReducer->pFillInfo, pQueryInfo->order.order, newTime,
|
|
||||||
// pQueryInfo->groupbyExpr.numOfGroupCols, 4096, 0, NULL, pLocalReducer->rowSize);
|
|
||||||
taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
|
taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,4 +29,6 @@ bool tscValidateTableNameLength(size_t len);
|
||||||
|
|
||||||
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
|
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
|
||||||
|
|
||||||
|
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
|
||||||
|
|
||||||
#endif // TDENGINE_NAME_H
|
#endif // TDENGINE_NAME_H
|
||||||
|
|
|
@ -75,3 +75,33 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
|
||||||
|
|
||||||
return pFilter;
|
return pFilter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
|
||||||
|
if (slidingTime == 0) {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime;
|
||||||
|
if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
|
||||||
|
/*
|
||||||
|
* here we revised the start time of day according to the local time zone,
|
||||||
|
* but in case of DST, the start time of one day need to be dynamically decided.
|
||||||
|
*/
|
||||||
|
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
||||||
|
#if defined(WINDOWS) && _MSC_VER >= 1900
|
||||||
|
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
|
||||||
|
int64_t timezone = _timezone;
|
||||||
|
int32_t daylight = _daylight;
|
||||||
|
char** tzname = _tzname;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
|
||||||
|
start += timezone * t;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t end = start + intervalTime - 1;
|
||||||
|
if (end < startTime) {
|
||||||
|
start += slidingTime;
|
||||||
|
}
|
||||||
|
return start;
|
||||||
|
}
|
||||||
|
|
|
@ -60,8 +60,6 @@ typedef struct SPoint {
|
||||||
void * val;
|
void * val;
|
||||||
} SPoint;
|
} SPoint;
|
||||||
|
|
||||||
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision);
|
|
||||||
|
|
||||||
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
|
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
|
||||||
SFillColInfo* pFillCol);
|
SFillColInfo* pFillCol);
|
||||||
|
|
|
@ -1681,8 +1681,7 @@ static bool onlyQueryTags(SQuery* pQuery) {
|
||||||
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *realWin, STimeWindow *win) {
|
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *realWin, STimeWindow *win) {
|
||||||
assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime);
|
assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime);
|
||||||
|
|
||||||
win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision);
|
win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision);
|
||||||
|
|
||||||
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
|
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
|
||||||
/*
|
/*
|
||||||
* if the realSkey > INT64_MAX - pQuery->intervalTime, the query duration between
|
* if the realSkey > INT64_MAX - pQuery->intervalTime, the query duration between
|
||||||
|
@ -3664,11 +3663,11 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
|
||||||
assert(pQuery->rec.rows <= pQuery->rec.capacity);
|
assert(pQuery->rec.rows <= pQuery->rec.capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
|
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
// update the number of result for each, only update the number of rows for the corresponding window result.
|
// update the number of result for each, only update the number of rows for the corresponding window result.
|
||||||
if (pQuery->intervalTime == 0) {
|
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
|
for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
|
||||||
SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i];
|
SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i];
|
||||||
|
@ -3682,14 +3681,6 @@ static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, S
|
||||||
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
|
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t g = pTableQueryInfo->groupIndex;
|
|
||||||
// assert(pRuntimeEnv->windowResInfo.size > 0);
|
|
||||||
//
|
|
||||||
// SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
|
|
||||||
// if (pWindowRes->numOfRows == 0) {
|
|
||||||
// pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4258,7 +4249,10 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
} else { // interval query
|
} else { // interval query
|
||||||
TSKEY nextKey = blockInfo.window.skey;
|
TSKEY nextKey = blockInfo.window.skey;
|
||||||
setIntervalQueryRange(pQInfo, nextKey);
|
setIntervalQueryRange(pQInfo, nextKey);
|
||||||
/*int32_t ret = */setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
|
|
||||||
|
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
|
||||||
|
setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4658,6 +4652,8 @@ static void doRestoreContext(SQInfo *pQInfo) {
|
||||||
static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
if (isIntervalQuery(pQuery)) {
|
if (isIntervalQuery(pQuery)) {
|
||||||
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||||
|
@ -4667,6 +4663,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
STableQueryInfo* item = taosArrayGetP(group, j);
|
STableQueryInfo* item = taosArrayGetP(group, j);
|
||||||
closeAllTimeWindow(&item->windowResInfo);
|
closeAllTimeWindow(&item->windowResInfo);
|
||||||
|
removeRedundantWindow(&item->windowResInfo, item->lastKey - step, step);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // close results for group result
|
} else { // close results for group result
|
||||||
|
|
|
@ -22,41 +22,6 @@
|
||||||
|
|
||||||
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
|
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
|
||||||
|
|
||||||
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision) {
|
|
||||||
if (slidingTime == 0) {
|
|
||||||
return startTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h') {
|
|
||||||
return (startTime / slidingTime) * slidingTime;
|
|
||||||
} else {
|
|
||||||
/*
|
|
||||||
* here we revised the start time of day according to the local time zone,
|
|
||||||
* but in case of DST, the start time of one day need to be dynamically decided.
|
|
||||||
*
|
|
||||||
* TODO dynamically decide the start time of a day, move to common module
|
|
||||||
*/
|
|
||||||
|
|
||||||
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
|
||||||
#if defined(WINDOWS) && _MSC_VER >= 1900
|
|
||||||
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
|
|
||||||
int64_t timezone = _timezone;
|
|
||||||
int32_t daylight = _daylight;
|
|
||||||
char** tzname = _tzname;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
|
|
||||||
|
|
||||||
int64_t revStartime = (startTime / slidingTime) * slidingTime + timezone * t;
|
|
||||||
int64_t revEndtime = revStartime + slidingTime - 1;
|
|
||||||
if (revEndtime < startTime) {
|
|
||||||
revStartime += slidingTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
return revStartime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
|
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
|
||||||
if (fillType == TSDB_FILL_NONE) {
|
if (fillType == TSDB_FILL_NONE) {
|
||||||
|
@ -128,7 +93,7 @@ static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterva
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
return ekey;
|
return ekey;
|
||||||
} else {
|
} else {
|
||||||
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
|
return taosGetIntervalStartTimestamp(ekey, timeInterval, timeInterval, slidingTimeUnit, precision);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -110,13 +110,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
// current connect is broken
|
// current connect is broken
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
// add lock here
|
|
||||||
handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
|
handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
|
||||||
if (handle == NULL) { // failed to register qhandle
|
if (handle == NULL) { // failed to register qhandle
|
||||||
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
|
||||||
qKillQuery(pQInfo);
|
qKillQuery(pQInfo);
|
||||||
qKillQuery(pQInfo);
|
qKillQuery(pQInfo);
|
||||||
|
pQInfo = NULL;
|
||||||
} else {
|
} else {
|
||||||
assert(*handle == pQInfo);
|
assert(*handle == pQInfo);
|
||||||
pRsp->qhandle = htobe64((uint64_t) (handle));
|
pRsp->qhandle = htobe64((uint64_t) (handle));
|
||||||
|
|
Loading…
Reference in New Issue