From 0f22b3d41b43f9ada67168c878ec8492e640af47 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 May 2021 10:17:16 +0800 Subject: [PATCH] [td-3186] --- src/client/inc/tscUtil.h | 13 +- src/client/inc/tsclient.h | 2 +- src/client/src/tscAsync.c | 15 +- src/client/src/tscSQLParser.c | 6 +- src/client/src/tscServer.c | 9 +- src/client/src/tscSubquery.c | 39 ++--- src/client/src/tscUtil.c | 292 ++++++++++++++++++++++------------ src/query/src/qExecutor.c | 2 +- 8 files changed, 236 insertions(+), 142 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index ca1ea54e16..48b1138072 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include #include "exception.h" #include "os.h" #include "qExtbuffer.h" @@ -93,22 +94,12 @@ typedef struct SVgroupTableInfo { SArray *itemList; // SArray } SVgroupTableInfo; -static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd) { - assert(pCmd != NULL); - if (pCmd->pQueryInfo == NULL) { - return NULL; - } - - return pCmd->active; -} - -SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd); - int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); +void doRetrieveSubqueryData(SSchedMsg *pMsg); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, uint32_t offset); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 875b485cf2..9a22961f30 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -454,7 +454,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock); -void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput); +void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput); void destroyTableNameList(SSqlCmd* pCmd); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 1456a91099..20a2164149 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -222,6 +222,17 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { tscResetForNextRetrieve(pRes); // handle the sub queries of join query + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); + if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doRetrieveSubqueryData; + schedMsg.ahandle = (void *)pSql->self; + schedMsg.thandle = (void *)1; + schedMsg.msg = 0; + taosScheduleTask(tscQhandle, &schedMsg); + return; + } + if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockForSubquery(pSql); } else if (pRes->completed) { @@ -258,7 +269,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd); + SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd); tscBuildAndSendRequest(pSql, pQueryInfo1); } } @@ -477,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } } else { // stream computing - SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; code = tscGetTableMeta(pSql, pTableMetaInfo); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 365ff4147a..675ce27de2 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -7201,6 +7201,8 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { } } + tfree(pTableMeta); + // load the table meta for a given table name list if (taosArrayGetSize(plist) > 0) { int32_t code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList); @@ -7334,11 +7336,13 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS // NOTE: order mix up in subquery not support yet. pQueryInfo->order = pSub->order; - char* tmp = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); + STableMetaInfo** tmp = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); if (tmp == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pQueryInfo->pTableMetaInfo = tmp; + pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables] = pTableMetaInfo1; pQueryInfo->numOfTables += 1; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index f35be615a4..eb61894061 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -539,7 +539,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; - SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); pRetrieveMsg->free = htons(pQueryInfo->type); pRetrieveMsg->qId = htobe64(pSql->res.qId); @@ -822,7 +822,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this } - SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryAttr query = {{0}}; tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); @@ -1620,7 +1620,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { } // global aggregation may be the upstream for parent query - SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); if (pQueryInfo->pQInfo == NULL) { STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -2061,6 +2061,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { pSql->res.numOfTotal = pMultiMeta->numOfTables; tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables); + taosHashCleanup(pSet); return TSDB_CODE_SUCCESS; } @@ -2363,7 +2364,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { pRes->completed = (pRetrieve->completed == 1); pRes->data = pRetrieve->data; - SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 406214aba0..0ca79bc702 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -82,8 +82,7 @@ static bool allSubqueryDone(SSqlObj *pParentSql) { for (int i = 0; i < subState->numOfSub; i++) { SSqlObj* pSub = pParentSql->pSubs[i]; if (0 == subState->states[i]) { - tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index: %d NOT finished, abort query completion check", pParentSql->self, - pSub->self, i); + tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index: %d NOT finished yet", pParentSql->self, pSub->self, i); done = false; break; } else { @@ -100,23 +99,21 @@ static bool allSubqueryDone(SSqlObj *pParentSql) { bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { SSubqueryState *subState = &pParentSql->subState; - assert(idx < subState->numOfSub); pthread_mutex_lock(&subState->mutex); - bool done = allSubqueryDone(pParentSql); - if (done) { - tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx); - pthread_mutex_unlock(&subState->mutex); - return false; - } - - tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d state set to 1", pParentSql->self, pSql->self, idx); +// bool done = allSubqueryDone(pParentSql); +// if (done) { +// tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx); +// pthread_mutex_unlock(&subState->mutex); +// return false; +// } + tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx); subState->states[idx] = 1; - done = allSubqueryDone(pParentSql); + bool done = allSubqueryDone(pParentSql); pthread_mutex_unlock(&subState->mutex); return done; } @@ -2415,7 +2412,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { const uint32_t nBufferSize = (1u << 16u); // 64KB - SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SSubqueryState *pState = &pSql->subState; @@ -3487,8 +3484,6 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) { assert(pQueryInfo != NULL); -// int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; - SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { goto _cleanup; @@ -3506,25 +3501,25 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STable pQueryAttr->tableGroupInfo = *pTableGroupInfo; // calculate the result row size - SExprInfo* pei = NULL; + SExprInfo* pEx = NULL; int32_t num = 0; if (pQueryAttr->pExpr3 != NULL) { - pei = pQueryAttr->pExpr3; + pEx = pQueryAttr->pExpr3; num = pQueryAttr->numOfExpr3; } else if (pQueryAttr->pExpr2 != NULL) { - pei = pQueryAttr->pExpr2; + pEx = pQueryAttr->pExpr2; num = pQueryAttr->numOfExpr2; } else { - pei = pQueryAttr->pExpr1; + pEx = pQueryAttr->pExpr1; num = pQueryAttr->numOfOutput; } for (int16_t col = 0; col < num; ++col) { - pQueryAttr->resultRowSize += pei[col].base.resBytes; + pQueryAttr->resultRowSize += pEx[col].base.resBytes; // keep the tag length - if (TSDB_COL_IS_TAG(pei[col].base.colInfo.flag)) { - pQueryAttr->tagLen += pei[col].base.resBytes; + if (TSDB_COL_IS_TAG(pEx[col].base.colInfo.flag)) { + pQueryAttr->tagLen += pEx[col].base.resBytes; } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 5bd7417f5f..b73e2e634b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -648,7 +648,7 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) { typedef struct SDummyInputInfo { SSDataBlock *block; - SSqlRes *pRes; // refactor: remove it + SSqlObj *pSql; // refactor: remove it } SDummyInputInfo; typedef struct SJoinStatus { @@ -664,92 +664,198 @@ typedef struct SJoinOperatorInfo { SRspResultInfo resultInfo; // todo refactor, add this info for each operator } SJoinOperatorInfo; -SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { - SOperatorInfo *pOperator = (SOperatorInfo*) param; - - SDummyInputInfo *pInput = pOperator->info; - char* pData = pInput->pRes->data; - - SSDataBlock* pBlock = pInput->block; - pBlock->info.rows = pInput->pRes->numOfRows; - if (pBlock->info.rows == 0) { - return NULL; - } - - //TODO refactor +static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock) { int32_t offset = 0; + char* pData = pRes->data; for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); if (pData != NULL) { pColData->pData = pData + offset * pBlock->info.rows; } else { - pColData->pData = pInput->pRes->urow[i]; + pColData->pData = pRes->urow[i]; } offset += pColData->info.bytes; } - pInput->pRes->numOfRows = 0; + pRes->numOfRows = 0; +} + +// NOTE: there is already exists data blocks before this function calls. +SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { + SOperatorInfo *pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SDummyInputInfo *pInput = pOperator->info; + SSqlObj* pSql = pInput->pSql; + SSqlRes* pRes = &pSql->res; + + SSDataBlock* pBlock = pInput->block; + + pBlock->info.rows = pRes->numOfRows; + if (pRes->numOfRows != 0) { + doSetupSDataBlock(pRes, pBlock); + *newgroup = false; + return pBlock; + } + + // No data block exists. So retrieve and transfer it into to SSDataBlock + TAOS_ROW pRow = NULL; + taos_fetch_block(pSql, &pRow); + + if (pRes->numOfRows == 0) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + pBlock->info.rows = pRes->numOfRows; + doSetupSDataBlock(pRes, pBlock); *newgroup = false; return pBlock; } -SSDataBlock* doBlockJoin(void* param, bool* newgroup) { +static int32_t v = 0; +SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*) param; - assert(pOperator->numOfUpstream > 1); - - SSDataBlock* block0 = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - SSDataBlock* block1 = pOperator->upstream[1]->exec(pOperator->upstream[1], newgroup); - - if (block1 == NULL || block0 == NULL) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } - assert(block0 != block1); + assert(pOperator->numOfUpstream > 1); SJoinOperatorInfo* pJoinInfo = pOperator->info; - pJoinInfo->status[0].pBlock = block0; - pJoinInfo->status[1].pBlock = block1; + pJoinInfo->pRes->info.rows = 0; - SJoinStatus* st0 = &pJoinInfo->status[0]; - SJoinStatus* st1 = &pJoinInfo->status[1]; + while(1) { + for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) { + SJoinStatus* pStatus = &pJoinInfo->status[i]; + if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) { + pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup); + pStatus->index = 0; - while (st0->index < st0->pBlock->info.rows && st1->index < st1->pBlock->info.rows) { - SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0); - SColumnInfoData* p1 = taosArrayGet(st1->pBlock->pDataBlock, 0); + if (i == 0 && pStatus->pBlock != NULL) { + v += pStatus->pBlock->info.rows; + printf("---------------%d\n", v); + } - int64_t* ts0 = (int64_t*) p0->pData; - int64_t* ts1 = (int64_t*) p1->pData; - if (ts0[st0->index] == ts1[st1->index]) { // add to the final result buffer - // check if current output buffer is over the threshold to pause current loop - int32_t rows = pJoinInfo->pRes->info.rows; - for(int32_t j = 0; j < st0->pBlock->info.numOfCols; ++j) { - SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j); - SColumnInfoData* pSrc = taosArrayGet(st0->pBlock->pDataBlock, j); + if (pStatus->pBlock == NULL) { + pOperator->status = OP_EXEC_DONE; - int32_t bytes = pSrc->info.bytes; - memcpy(pCol1->pData + rows * bytes, pSrc->pData + st0->index * bytes, bytes); + pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows; + return pJoinInfo->pRes; + } } - - for(int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) { - SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + st0->pBlock->info.numOfCols); - SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j); - - int32_t bytes = pSrc->info.bytes; - memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes); - } - - st0->index++; - st1->index++; - pJoinInfo->pRes->info.rows++; - } else if (ts0[st0->index] < ts1[st1->index]) { - st0->index++; - } else { - st1->index++; } - } - return pJoinInfo->pRes; + SJoinStatus* st0 = &pJoinInfo->status[0]; + SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0); + int64_t* ts0 = (int64_t*) p0->pData; + + bool prefixEqual = true; + + while(1) { + prefixEqual = true; + for (int32_t i = 1; i < pJoinInfo->numOfUpstream; ++i) { + SJoinStatus* st = &pJoinInfo->status[i]; + + SColumnInfoData* p = taosArrayGet(st->pBlock->pDataBlock, 0); + int64_t* ts = (int64_t*)p->pData; + + if (ts[st->index] < ts0[st0->index]) { // less than the first + prefixEqual = false; + if ((++(st->index)) >= st->pBlock->info.rows) { + break; + } + } else if (ts[st->index] > ts0[st0->index]) { // greater than the first; + if (prefixEqual == true) { + prefixEqual = false; + for (int32_t j = 0; j < i; ++j) { + SJoinStatus* stx = &pJoinInfo->status[j]; + if ((++(stx->index)) >= stx->pBlock->info.rows) { + break; + } + } + } else { + if ((++(st0->index)) >= st0->pBlock->info.rows) { + break; + } + } + } + } + + if (prefixEqual) { + int32_t offset = 0; + bool completed = false; + for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) { + SJoinStatus* st1 = &pJoinInfo->status[i]; + int32_t rows = pJoinInfo->pRes->info.rows; + + for (int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) { + SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + offset); + SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j); + + int32_t bytes = pSrc->info.bytes; + memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes); + } + + offset += st1->pBlock->info.numOfCols; + if ((++(st1->index)) == st1->pBlock->info.rows) { + completed = true; + } + } + + if ((++pJoinInfo->pRes->info.rows) >= pJoinInfo->resultInfo.capacity) { + pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows; + return pJoinInfo->pRes; + } + + if (completed == true) { + break; + } + } + } +/* + while (st0->index < st0->pBlock->info.rows && st1->index < st1->pBlock->info.rows) { + SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0); + SColumnInfoData* p1 = taosArrayGet(st1->pBlock->pDataBlock, 0); + + int64_t* ts0 = (int64_t*)p0->pData; + int64_t* ts1 = (int64_t*)p1->pData; + if (ts0[st0->index] == ts1[st1->index]) { // add to the final result buffer + // check if current output buffer is over the threshold to pause current loop + int32_t rows = pJoinInfo->pRes->info.rows; + for (int32_t j = 0; j < st0->pBlock->info.numOfCols; ++j) { + SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j); + SColumnInfoData* pSrc = taosArrayGet(st0->pBlock->pDataBlock, j); + + int32_t bytes = pSrc->info.bytes; + memcpy(pCol1->pData + rows * bytes, pSrc->pData + st0->index * bytes, bytes); + } + + for (int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) { + SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + st0->pBlock->info.numOfCols); + SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j); + + int32_t bytes = pSrc->info.bytes; + memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes); + } + + st0->index++; + st1->index++; + + if ((++pJoinInfo->pRes->info.rows) >= pJoinInfo->resultInfo.capacity) { + pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows; + return pJoinInfo->pRes; + } + } else if (ts0[st0->index] < ts1[st1->index]) { + st0->index++; + } else { + st1->index++; + } + }*/ + } } static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { @@ -762,15 +868,15 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { } pInfo->block = destroyOutputBuf(pInfo->block); - pInfo->pRes = NULL; + pInfo->pSql = NULL; } // todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later -SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t numOfCols) { +SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols) { assert(numOfCols > 0); SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); - pInfo->pRes = (SSqlRes*) pResult; + pInfo->pSql = pSql; pInfo->block = calloc(numOfCols, sizeof(SSDataBlock)); pInfo->block->info.numOfCols = numOfCols; @@ -824,7 +930,7 @@ SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstre pOperator->numOfOutput = numOfOutput; pOperator->blockingOptr = false; pOperator->info = pInfo; - pOperator->exec = doBlockJoin; + pOperator->exec = doDataBlockJoin; pOperator->cleanup = destroyDummyInputOperator; for(int32_t i = 0; i < numOfUpstream; ++i) { @@ -848,7 +954,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { pRes->completed = (pRes->numOfRows == 0); } -void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { +void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { // handle the following query process if (px->pQInfo == NULL) { SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList); @@ -879,26 +985,29 @@ void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* // if it is a join query, create join operator here int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns; - - SOperatorInfo* pSourceOperator = createDummyInputOperator((char*)pRes[0], pSchema, numOfCol1); + SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1); SSchema* schema = NULL; if (px->numOfTables > 1) { - SOperatorInfo* p[2] = {0}; + SOperatorInfo** p = calloc(px->numOfTables, POINTER_BYTES); p[0] = pSourceOperator; - SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[1]->pTableMeta); - numOfCol1 = px->pTableMetaInfo[1]->pTableMeta->tableInfo.numOfColumns; - - SOperatorInfo* pSourceOperator1 = createDummyInputOperator((char*)pRes[1], pSchema1, numOfCol1); - p[1] = pSourceOperator1; - - int32_t num = pSourceOperator->numOfOutput + pSourceOperator1->numOfOutput; + int32_t num = taosArrayGetSize(px->colList); schema = calloc(num, sizeof(SSchema)); + memcpy(schema, pSchema, numOfCol1*sizeof(SSchema)); - memcpy(&schema[0], pSchema, pSourceOperator->numOfOutput * sizeof(SSchema)); + int32_t offset = pSourceOperator->numOfOutput; + + for(int32_t i = 1; i < px->numOfTables; ++i) { + SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[i]->pTableMeta); + int32_t n = px->pTableMetaInfo[i]->pTableMeta->tableInfo.numOfColumns; + + p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n); + + memcpy(&schema[offset], pSchema1, n * sizeof(SSchema)); + offset += n; + } - memcpy(&schema[pSourceOperator->numOfOutput], pSchema1, pSourceOperator1->numOfOutput * sizeof(SSchema)); pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num); } @@ -917,6 +1026,7 @@ void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); tfree(pColumnInfo); + tfree(schema); } uint64_t qId = 0; @@ -1259,7 +1369,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { return TSDB_CODE_SUCCESS; } -SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd) { +SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd) { return pCmd->active; } @@ -2897,7 +3007,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t return NULL; } - SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex]; pNew->pTscObj = pSql->pTscObj; @@ -3093,29 +3203,15 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } } -static void doRetrieveSubqueryData(SSchedMsg *pMsg) { +void doRetrieveSubqueryData(SSchedMsg *pMsg) { SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle); if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle); return; } - int32_t numOfRows = 0; - for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; - /*TAOS_ROW row = */taos_fetch_row(pSub); -// SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); -// int32_t rows = taos_fetch_block(pSub, &row); - if (numOfRows == 0) { - numOfRows = pSub->res.numOfRows; - } - } - - if (numOfRows > 0) { - SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); - SSqlRes* list[2] = {&pSql->pSubs[0]->res, &pSql->pSubs[1]->res}; - handleDownstreamOperator(list, 2, pQueryInfo, &pSql->res); - } + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); + handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, &pSql->res); pSql->res.qId = -1; if (pSql->res.code == TSDB_CODE_SUCCESS) { @@ -3133,7 +3229,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = tres; if (!subAndCheckDone(pSql, pParentSql, ps->subqueryIndex)) { - tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d freed, not all subquery finished", pParentSql->self, pSql->self, ps->subqueryIndex); + tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, ps->subqueryIndex); return; } @@ -4047,10 +4143,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt tscExprAssign(&pQueryAttr->pExpr2[i], p); } } -// int32_t code = createProjectionExpr(pQueryInfo, pTableMetaInfo, &pQueryAttr->pExpr2, &pQueryAttr->numOfExpr2); -// if (code != TSDB_CODE_SUCCESS) { -// return code; -// } // tag column info int32_t code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6c92293bfd..37cf2ec7e2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4858,7 +4858,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { } pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { + if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) { break; } }