[TD-225] refactor codes.

This commit is contained in:
Haojun Liao 2020-12-03 10:33:40 +08:00
parent d59b8f18da
commit caeabf3821
5 changed files with 72 additions and 68 deletions

View File

@ -250,7 +250,10 @@ typedef struct SQueryInfo {
int64_t * fillVal; // default value for fill int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t tableLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id int16_t resColumnId; // result column id
} SQueryInfo; } SQueryInfo;

View File

@ -5308,6 +5308,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
// keep original limitation value in globalLimit // keep original limitation value in globalLimit
pQueryInfo->clauseLimit = pQueryInfo->limit.limit; pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
pQueryInfo->prjOffset = pQueryInfo->limit.offset; pQueryInfo->prjOffset = pQueryInfo->limit.offset;
pQueryInfo->tableLimit = -1;
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/* /*
@ -5317,6 +5318,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
* than or equal to the value of limit. * than or equal to the value of limit.
*/ */
if (pQueryInfo->limit.limit > 0) { if (pQueryInfo->limit.limit > 0) {
pQueryInfo->tableLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
} }

View File

@ -697,6 +697,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->tableLimit = htobe64(pQueryInfo->tableLimit);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number

View File

@ -6260,11 +6260,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval); pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding); pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding);
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
// pQueryMsg->interval.intervalUnit = pQueryMsg->interval.intervalUnit;
// pQueryMsg->interval.slidingUnit = pQueryMsg->interval.slidingUnit;
// pQueryMsg->interval.offsetUnit = pQueryMsg->interval.offsetUnit;
pQueryMsg->limit = htobe64(pQueryMsg->limit); pQueryMsg->limit = htobe64(pQueryMsg->limit);
pQueryMsg->offset = htobe64(pQueryMsg->offset); pQueryMsg->offset = htobe64(pQueryMsg->offset);
pQueryMsg->tableLimit = htobe64(pQueryMsg->tableLimit);
pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId); pQueryMsg->orderColId = htons(pQueryMsg->orderColId);

View File

@ -43,48 +43,48 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
return size; return size;
} }
int32_t initWindowResInfo(SResultRowInfo *pWindowResInfo, int32_t size, int16_t type) { int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
pWindowResInfo->capacity = size; pResultRowInfo->capacity = size;
pWindowResInfo->type = type; pResultRowInfo->type = type;
pWindowResInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
pWindowResInfo->size = 0; pResultRowInfo->size = 0;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES); pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
if (pWindowResInfo->pResult == NULL) { if (pResultRowInfo->pResult == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void cleanupTimeWindowInfo(SResultRowInfo *pWindowResInfo) { void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) {
if (pWindowResInfo == NULL) { if (pResultRowInfo == NULL) {
return; return;
} }
if (pWindowResInfo->capacity == 0) { if (pResultRowInfo->capacity == 0) {
assert(pWindowResInfo->pResult == NULL); assert(pResultRowInfo->pResult == NULL);
return; return;
} }
if (pWindowResInfo->type == TSDB_DATA_TYPE_BINARY || pWindowResInfo->type == TSDB_DATA_TYPE_NCHAR) { if (pResultRowInfo->type == TSDB_DATA_TYPE_BINARY || pResultRowInfo->type == TSDB_DATA_TYPE_NCHAR) {
for(int32_t i = 0; i < pWindowResInfo->size; ++i) { for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
tfree(pWindowResInfo->pResult[i]->key); tfree(pResultRowInfo->pResult[i]->key);
} }
} }
tfree(pWindowResInfo->pResult); tfree(pResultRowInfo->pResult);
} }
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo) { void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) {
return; return;
} }
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow *pWindowRes = pWindowResInfo->pResult[i]; SResultRow *pWindowRes = pResultRowInfo->pResult[i];
clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type); clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type);
int32_t groupIndex = 0; int32_t groupIndex = 0;
int64_t uid = 0; int64_t uid = 0;
@ -93,30 +93,30 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowR
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex))); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex)));
} }
pWindowResInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
pWindowResInfo->size = 0; pResultRowInfo->size = 0;
pWindowResInfo->startTime = TSKEY_INITIAL_VAL; pResultRowInfo->startTime = TSKEY_INITIAL_VAL;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
} }
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0 || num == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) {
return; return;
} }
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo);
assert(num >= 0 && num <= numOfClosed); assert(num >= 0 && num <= numOfClosed);
int16_t type = pWindowResInfo->type; int16_t type = pResultRowInfo->type;
int64_t uid = getResultInfoUId(pRuntimeEnv); int64_t uid = getResultInfoUId(pRuntimeEnv);
char *key = NULL; char *key = NULL;
int16_t bytes = -1; int16_t bytes = -1;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pResultRowInfo->pResult[i];
if (pResult->closed) { // remove the window slot from hash table if (pResult->closed) { // remove the window slot from hash table
getResultRowKeyInfo(pResult, type, &key, &bytes); getResultRowKeyInfo(pResult, type, &key, &bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
@ -126,23 +126,23 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
} }
} }
int32_t remain = pWindowResInfo->size - num; int32_t remain = pResultRowInfo->size - num;
// clear all the closed windows from the window list // clear all the closed windows from the window list
for (int32_t k = 0; k < remain; ++k) { for (int32_t k = 0; k < remain; ++k) {
copyResultRow(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k], type); copyResultRow(pRuntimeEnv, pResultRowInfo->pResult[k], pResultRowInfo->pResult[num + k], type);
} }
// move the unclosed window in the front of the window list // move the unclosed window in the front of the window list
for (int32_t k = remain; k < pWindowResInfo->size; ++k) { for (int32_t k = remain; k < pResultRowInfo->size; ++k) {
SResultRow *pWindowRes = pWindowResInfo->pResult[k]; SResultRow *pWindowRes = pResultRowInfo->pResult[k];
clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type); clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type);
} }
pWindowResInfo->size = remain; pResultRowInfo->size = remain;
for (int32_t k = 0; k < pWindowResInfo->size; ++k) { for (int32_t k = 0; k < pResultRowInfo->size; ++k) {
SResultRow *pResult = pWindowResInfo->pResult[k]; SResultRow *pResult = pResultRowInfo->pResult[k];
getResultRowKeyInfo(pResult, type, &key, &bytes); getResultRowKeyInfo(pResult, type, &key, &bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
@ -150,43 +150,43 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
assert(p != NULL); assert(p != NULL);
int32_t v = (*p - num); int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size); assert(v >= 0 && v <= pResultRowInfo->size);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t));
} }
pWindowResInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
} }
void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) {
return; return;
} }
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo);
clearFirstNWindowRes(pRuntimeEnv, numOfClosed); clearFirstNWindowRes(pRuntimeEnv, numOfClosed);
} }
int32_t numOfClosedTimeWindow(SResultRowInfo *pWindowResInfo) { int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) {
int32_t i = 0; int32_t i = 0;
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i]->closed) { while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
++i; ++i;
} }
return i; return i;
} }
void closeAllTimeWindow(SResultRowInfo *pWindowResInfo) { void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pWindowResInfo->pResult[i]->closed) { if (pResultRowInfo->pResult[i]->closed) {
continue; continue;
} }
pWindowResInfo->pResult[i]->closed = true; pResultRowInfo->pResult[i]->closed = true;
} }
} }
@ -195,41 +195,41 @@ void closeAllTimeWindow(SResultRowInfo *pWindowResInfo) {
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order * NOTE: remove redundant, only when the result set order equals to traverse order
*/ */
void removeRedundantWindow(SResultRowInfo *pWindowResInfo, TSKEY lastKey, int32_t order) { void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
if (pWindowResInfo->size <= 1) { if (pResultRowInfo->size <= 1) {
return; return;
} }
// get the result order // get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0]->win.skey < pWindowResInfo->pResult[1]->win.skey)? 1:-1; int32_t resultOrder = (pResultRowInfo->pResult[0]->win.skey < pResultRowInfo->pResult[1]->win.skey)? 1:-1;
if (order != resultOrder) { if (order != resultOrder) {
return; return;
} }
int32_t i = 0; int32_t i = 0;
if (order == QUERY_ASC_FORWARD_STEP) { if (order == QUERY_ASC_FORWARD_STEP) {
TSKEY ekey = pWindowResInfo->pResult[i]->win.ekey; TSKEY ekey = pResultRowInfo->pResult[i]->win.ekey;
while (i < pWindowResInfo->size && (ekey < lastKey)) { while (i < pResultRowInfo->size && (ekey < lastKey)) {
++i; ++i;
} }
} else if (order == QUERY_DESC_FORWARD_STEP) { } else if (order == QUERY_DESC_FORWARD_STEP) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i]->win.skey > lastKey)) { while (i < pResultRowInfo->size && (pResultRowInfo->pResult[i]->win.skey > lastKey)) {
++i; ++i;
} }
} }
if (i < pWindowResInfo->size) { if (i < pResultRowInfo->size) {
pWindowResInfo->size = (i + 1); pResultRowInfo->size = (i + 1);
} }
} }
bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot) { bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot) {
return (getResultRow(pWindowResInfo, slot)->closed == true); return (getResultRow(pResultRowInfo, slot)->closed == true);
} }
void closeTimeWindow(SResultRowInfo *pWindowResInfo, int32_t slot) { void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow(pWindowResInfo, slot)->closed = true; getResultRow(pResultRowInfo, slot)->closed = true;
} }
void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16_t type) { void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16_t type) {