[TD-225] refactor.

This commit is contained in:
Haojun Liao 2020-12-12 15:21:59 +08:00
parent d9c09bde8b
commit bcc9d7bc9f
2 changed files with 19 additions and 19 deletions

View File

@ -24,6 +24,9 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
int32_t getOutputInterResultBufSize(SQuery* pQuery); int32_t getOutputInterResultBufSize(SQuery* pQuery);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t type); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t type);
@ -47,9 +50,6 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pWindowResInfo, int
return pWindowResInfo->pResult[slot]; return pWindowResInfo->pResult[slot];
} }
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot); bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot);
int32_t initResultRow(SResultRow *pResultRow); int32_t initResultRow(SResultRow *pResultRow);

View File

@ -464,13 +464,13 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
return true; return true;
} }
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, char *pData, static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) { int16_t bytes, bool masterscan, uint64_t uid) {
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
int32_t *p1 = int32_t *p1 =
(int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (p1 != NULL) { if (p1 != NULL) {
pWindowResInfo->curIndex = *p1; pResultRowInfo->curIndex = *p1;
} else { } else {
if (!masterscan) { // not master scan, do not add new timewindow if (!masterscan) { // not master scan, do not add new timewindow
return NULL; return NULL;
@ -478,46 +478,46 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
// TODO refactor // TODO refactor
// more than the capacity, reallocate the resources // more than the capacity, reallocate the resources
if (pWindowResInfo->size >= pWindowResInfo->capacity) { if (pResultRowInfo->size >= pResultRowInfo->capacity) {
int64_t newCapacity = 0; int64_t newCapacity = 0;
if (pWindowResInfo->capacity > 10000) { if (pResultRowInfo->capacity > 10000) {
newCapacity = (int64_t)(pWindowResInfo->capacity * 1.25); newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25);
} else { } else {
newCapacity = (int64_t)(pWindowResInfo->capacity * 1.5); newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
} }
char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES));
if (t == NULL) { if (t == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
pWindowResInfo->pResult = (SResultRow **)t; pResultRowInfo->pResult = (SResultRow **)t;
int32_t inc = (int32_t)newCapacity - pWindowResInfo->capacity; int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, POINTER_BYTES * inc); memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc);
pWindowResInfo->capacity = (int32_t)newCapacity; pResultRowInfo->capacity = (int32_t)newCapacity;
} }
SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool); SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool);
pWindowResInfo->pResult[pWindowResInfo->size] = pResult; pResultRowInfo->pResult[pResultRowInfo->size] = pResult;
int32_t ret = initResultRow(pResult); int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
// add a new result set for a new group // add a new result set for a new group
pWindowResInfo->curIndex = pWindowResInfo->size++; pResultRowInfo->curIndex = pResultRowInfo->size++;
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes),
(char *)&pWindowResInfo->curIndex, sizeof(int32_t)); (char *)&pResultRowInfo->curIndex, sizeof(int32_t));
} }
// too many time window in query // too many time window in query
if (pWindowResInfo->size > MAX_INTERVAL_TIME_WINDOW) { if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
return getResultRow(pWindowResInfo, pWindowResInfo->curIndex); return getResultRow(pResultRowInfo, pResultRowInfo->curIndex);
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp