[td-2895] refactor.

This commit is contained in:
Haojun Liao 2021-03-12 16:13:00 +08:00
parent aa063917ab
commit 0956e1b4df
2 changed files with 30 additions and 11 deletions

View File

@ -503,9 +503,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription
pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key;
tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
TSKEY s = INT64_MAX;
for(int32_t i = 0; i < size; ++i) {
TSKEY k = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, i))->key;
if (s > k) {
s = k;
}
}
pQueryInfo->window.skey = s;
tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
}
if (pSub->pTimer == NULL) {

View File

@ -3976,7 +3976,10 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo) {
return tidInfo;
}
static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SHashObj* pTableIdInfo) {
static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBlock, SHashObj* pTableIdInfo, int32_t order) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step;
STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo);
STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
if (idinfo != NULL) {
@ -4408,7 +4411,10 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pArithInfo->binfo;
pInfo->pRes->info.rows = 0;
SSDataBlock* pRes = pInfo->pRes;
int32_t order = pRuntimeEnv->pQuery->order.order;
pRes->info.rows = 0;
while(1) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
@ -4417,17 +4423,20 @@ static SSDataBlock* doArithmeticOperation(void* param) {
break;
}
setTagValue(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current;
// todo dynamic set tags
setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(pArithInfo, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
updateTableIdInfo(pRuntimeEnv->pQuery->current, pRuntimeEnv->pTableRetrieveTsMap);
if (pInfo->pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break;
}
}