Merge pull request #2509 from taosdata/feature/query

Feature/query
This commit is contained in:
Shengliang Guan 2020-06-30 15:57:36 +08:00 committed by GitHub
commit 44d02d2c24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 313 additions and 205 deletions

View File

@ -84,7 +84,7 @@ typedef struct SRetrieveSupport {
SColumnModel * pFinalColModel; // colModel for final result SColumnModel * pFinalColModel; // colModel for final result
SSubqueryState * pState; SSubqueryState * pState;
int32_t subqueryIndex; // index of current vnode in vnode list int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSqlObj; SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times uint32_t numOfRetry; // record the number of retry times
pthread_mutex_t queryMutex; pthread_mutex_t queryMutex;

View File

@ -36,6 +36,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql); void tscBuildResFromSubqueries(SSqlObj *pSql);
void **doSetResultRowData(SSqlObj *pSql, bool finalResult); void **doSetResultRowData(SSqlObj *pSql, bool finalResult);

View File

@ -213,8 +213,7 @@ typedef struct SQueryInfo {
typedef struct { typedef struct {
int command; int command;
uint8_t msgType; uint8_t msgType;
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
int8_t dataSourceType; // load data from file or not
union { union {
int32_t count; int32_t count;
@ -222,18 +221,23 @@ typedef struct {
}; };
int32_t insertType; int32_t insertType;
int32_t clauseIndex; // index of multiple subclause query int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished; int8_t parseFinished;
short numOfCols; short numOfCols;
uint32_t allocSize; uint32_t allocSize;
char * payload; char * payload;
int32_t payloadLen; int32_t payloadLen;
SQueryInfo **pQueryInfo; SQueryInfo **pQueryInfo;
int32_t numOfClause; int32_t numOfClause;
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
int32_t batchSize; // for parameter ('?') binding and batch processing int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams; int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
} SSqlCmd; } SSqlCmd;

View File

@ -431,6 +431,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return; return;
} }
tscDebug("%p get tableMeta successfully", pSql);
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
@ -446,20 +448,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
} }
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL);
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj; SSqlObj * pParObj = trs->pParentSql;
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
// NOTE: the vgroupInfo for the queried super table must be existed here. // NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pTableMetaInfo->vgroupList != NULL); assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return; return;
} }
goto _error;
} else { // continue to process normal async query } else { // continue to process normal async query
if (pCmd->parseFinished) { if (pCmd->parseFinished) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql); tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
@ -472,18 +474,41 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
} }
// if failed to process sql, go to error handler // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated
return; // 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT) {
tscDebug("%p redo parse sql string to build submit block", pSql);
pCmd->parseFinished = false;
if ((code = tsParseSql(pSql, true)) == TSDB_CODE_SUCCESS) {
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
if ((code = tscHandleInsertRetry(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
}
} else {// in case of other query type, continue
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
} }
// // todo update the submit message according to the new table meta
// // 1. table uid, 2. ip address goto _error;
// code = tscSendMsgToServer(pSql);
// if (code == TSDB_CODE_SUCCESS) return;
} else { } else {
tscDebug("%p continue parse sql after get table meta", pSql); tscDebug("%p continue parse sql after get table meta", pSql);
code = tsParseSql(pSql, false); code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
@ -492,45 +517,49 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else { } else {
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
} }
(*pSql->fp)(pSql->param, pSql, code); (*pSql->fp)(pSql->param, pSql, code);
return; return;
} }
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; // proceed to invoke the tscDoQuery();
} }
} }
} else { // stream computing } else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
pRes->code = code; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; } else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} }
}
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
return;
}
if (pSql->pStream) {
tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) { if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false); tsParseSql(pSql, false);
sem_post(&pSql->rspSem); sem_post(&pSql->rspSem);
} }
return; return;
} else {
tscDebug("%p get tableMeta successfully", pSql);
} }
tscDoQuery(pSql); tscDoQuery(pSql);
return;
_error:
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
}
} }

View File

@ -347,8 +347,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
int doProcessSql(SSqlObj *pSql) { int doProcessSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
int32_t code = TSDB_CODE_SUCCESS;
if (pCmd->command == TSDB_SQL_SELECT || if (pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE ||
@ -365,10 +364,13 @@ int doProcessSql(SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
code = tscSendMsgToServer(pSql); int32_t code = tscSendMsgToServer(pSql);
// NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRes->code = code; pRes->code = code;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return pRes->code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -25,6 +25,7 @@
typedef struct SInsertSupporter { typedef struct SInsertSupporter {
SSubqueryState* pState; SSubqueryState* pState;
SSqlObj* pSql; SSqlObj* pSql;
int32_t index;
} SInsertSupporter; } SInsertSupporter;
static void freeJoinSubqueryObj(SSqlObj* pSql); static void freeJoinSubqueryObj(SSqlObj* pSql);
@ -1414,7 +1415,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
} }
trs->subqueryIndex = i; trs->subqueryIndex = i;
trs->pParentSqlObj = pSql; trs->pParentSql = pSql;
trs->pFinalColModel = pModel; trs->pFinalColModel = pModel;
pthread_mutexattr_t mutexattr; pthread_mutexattr_t mutexattr;
@ -1499,7 +1500,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code)); tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code));
#endif #endif
SSqlObj* pParentSql = trsupport->pParentSqlObj; SSqlObj* pParentSql = trsupport->pParentSql;
pParentSql->res.code = code; pParentSql->res.code = code;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
@ -1508,8 +1509,45 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
tscHandleSubqueryError(trsupport, tres, pParentSql->res.code); tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
} }
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, int32_t code) {
SSqlObj *pParentSql = trsupport->pParentSql;
int32_t subqueryIndex = trsupport->subqueryIndex;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results
trsupport->localBuffer->num = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
tstrerror(code), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql);
// todo add to async res or not??
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex);
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return pParentSql->res.code;
}
taos_free_result(pSql);
return tscProcessSql(pNew);
}
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
SSqlObj *pParentSql = trsupport->pParentSqlObj; SSqlObj *pParentSql = trsupport->pParentSql;
int32_t subqueryIndex = trsupport->subqueryIndex; int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL); assert(pSql != NULL);
@ -1528,38 +1566,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", pParentSql, pSql, tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", pParentSql, pSql,
subqueryIndex, pParentSql->res.code); subqueryIndex, pParentSql->res.code);
} }
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex); tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pParentSql, pSql, tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pParentSql, pSql,
subqueryIndex, pParentSql->res.code); subqueryIndex, pParentSql->res.code);
} else { } else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
/* if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results
trsupport->localBuffer->num = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscDebug("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql,
tstrerror(numOfRows), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql);
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return; return;
} }
tscProcessSql(pNew);
return;
} else { // reach the maximum retry count, abort } else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql, tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
@ -1600,7 +1616,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) { static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
int32_t idx = trsupport->subqueryIndex; int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj; SSqlObj * pParentSql = trsupport->pParentSql;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
@ -1610,7 +1626,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk // data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num;
tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx); numOfRowsFromSubquery, idx);
@ -1624,15 +1640,14 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
trsupport->localBuffer->num, colInfo); trsupport->localBuffer->num, colInfo);
#endif #endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB); tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE); return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
return;
} }
// each result for a vnode is ordered as an independant list, // each result for a vnode is ordered as an independant list,
// then used as an input of loser tree for disk-based merge routine // then used as an input of loser tree for disk-based merge
int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType); int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
if (code != 0) { // set no disk space error info, and abort retry if (code != 0) { // set no disk space error info, and abort retry
return tscAbortFurtherRetryRetrieval(trsupport, pSql, code); return tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
@ -1640,7 +1655,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
int32_t remain = -1; int32_t remain = -1;
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfTotal - remain); pState->numOfTotal - remain);
return tscFreeSubSqlObj(trsupport, pSql); return tscFreeSubSqlObj(trsupport, pSql);
@ -1649,29 +1664,29 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// all sub-queries are returned, start to local merge process // all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pPObj, tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql,
pState->numOfTotal, pState->numOfRetrievedRows); pState->numOfTotal, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
tscClearInterpInfo(pPQueryInfo); tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pPObj); tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pParentSql);
tscDebug("%p build loser tree completed", pPObj); tscDebug("%p build loser tree completed", pParentSql);
pPObj->res.precision = pSql->res.precision; pParentSql->res.precision = pSql->res.precision;
pPObj->res.numOfRows = 0; pParentSql->res.numOfRows = 0;
pPObj->res.row = 0; pParentSql->res.row = 0;
// only free once // only free once
tfree(trsupport->pState); tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql); tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pPObj->res.code == TSDB_CODE_SUCCESS) { if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0); (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else { } else {
tscQueueAsyncRes(pPObj); tscQueueAsyncRes(pParentSql);
} }
} }
@ -1679,22 +1694,48 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SRetrieveSupport *trsupport = (SRetrieveSupport *)param; SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
int32_t idx = trsupport->subqueryIndex; int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj; SSqlObj * pParentSql = trsupport->pParentSql;
SSqlObj *pSql = (SSqlObj *)tres; SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately if (pSql == NULL) { // sql object has been released in error process, return immediately
tscDebug("%p subquery has been released, idx:%d, abort", pPObj, idx); tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx);
return; return;
} }
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pPObj->numOfSubs == pState->numOfTotal); assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time // query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex); pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
return tscHandleSubqueryError(trsupport, pSql, numOfRows); SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));
tscHandleSubqueryError(param, tres, numOfRows);
return;
}
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
return;
}
} else {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); // set global code and abort
}
tscHandleSubqueryError(param, tres, numOfRows);
return;
} }
SSqlRes * pRes = &pSql->res; SSqlRes * pRes = &pSql->res;
@ -1704,14 +1745,13 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(pRes->numOfRows == numOfRows); assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pPObj, pSql, tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pParentSql, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx); pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pPObj, pSql, tsMaxNumOfOrderedResults, num); pParentSql, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY); return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
return;
} }
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
@ -1722,11 +1762,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif #endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { // no disk space for tmp directory
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
tsAvailTmpDirGB, tsMinimalTmpDirGB); tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE); tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
return; return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
} }
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
@ -1771,80 +1811,56 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SRetrieveSupport *trsupport = (SRetrieveSupport *) param; SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
SSqlObj* pParentSql = trsupport->pParentSqlObj; SSqlObj* pParentSql = trsupport->pParentSql;
SSqlObj* pSql = (SSqlObj *) tres; SSqlObj* pSql = (SSqlObj *) tres;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// todo set error code // stable query killed or other subquery failed, all query stopped
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
// stable query is killed, abort further retry
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
code = pParentSql->res.code;
} tscHandleSubqueryError(param, tres, code);
return;
tscDebug("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql,
trsupport->subqueryIndex, tstrerror(code));
} }
/* /*
* if a query on a vnode is failed, all retrieve operations from vnode that occurs later * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
* function to abort current and remain retrieve process. * function to abort current and remain retrieve process.
* *
* NOTE: thread safe is required. * NOTE: thread safe is required.
*/ */
if (code != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { assert(code == taos_errno(pSql));
tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);
} else { // does not reach the maximum retry time, go on
tscDebug("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex); if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL);
taos_free_result(pSql);
tscProcessSql(pNew);
return; return;
} }
}
}
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
tscDebug("%p sub:%p query failed,ip:%s,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pParentSql->res.code);
tscHandleSubqueryError(param, tres, pParentSql->res.code);
} else { // success, proceed to retrieve data from dnode
tscDebug("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0);
} else { } else {
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
} }
tscHandleSubqueryError(param, tres, pParentSql->res.code);
return;
}
tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0);
} else {
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
} }
} }
@ -1876,13 +1892,36 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// release data block data // release data block data
tfree(pState); tfree(pState);
// pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp // restore user defined fp
pParentObj->fp = pParentObj->fetchFp; pParentObj->fp = pParentObj->fetchFp;
// todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function // all data has been sent to vnode, call user function
(*pParentObj->fp)(pParentObj->param, pParentObj, numOfRows); int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS)? pParentObj->res.code:pParentObj->res.numOfRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
}
/**
* it is a subquery, so after parse the sql string, copy the submit block to payload of itself
* @param pSql
* @return
*/
int32_t tscHandleInsertRetry(SSqlObj* pSql) {
assert(pSql != NULL && pSql->param != NULL);
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
assert(pSupporter->index < pSupporter->pState->numOfTotal);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
pRes->code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
if (pRes->code != TSDB_CODE_SUCCESS) {
return pRes->code;
}
return tscProcessSql(pSql);
} }
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
@ -1906,10 +1945,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
while(numOfSub < pSql->numOfSubs) { while(numOfSub < pSql->numOfSubs) {
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
pSupporter->pSql = pSql; pSupporter->pSql = pSql;
pSupporter->pState = pState; pSupporter->pState = pState;
pSupporter->index = numOfSub;
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);//createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter1, TSDB_SQL_INSERT, NULL);
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
if (pNew == NULL) { if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno)); tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
goto _error; goto _error;
@ -1940,6 +1980,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return pRes->code; // free all allocated resource return pRes->code; // free all allocated resource
} }
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// use the local variable // use the local variable
for (int32_t j = 0; j < numOfSub; ++j) { for (int32_t j = 0; j < numOfSub; ++j) {
SSqlObj *pSub = pSql->pSubs[j]; SSqlObj *pSub = pSql->pSubs[j];
@ -1947,7 +1989,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscProcessSql(pSub); tscProcessSql(pSub);
} }
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:

View File

@ -562,10 +562,8 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) {
// TODO: optimize this function, handle the case while binary is not presented // TODO: optimize this function, handle the case while binary is not presented
int len = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta);
@ -575,16 +573,37 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
pDataBlock += sizeof(SSubmitBlk); pDataBlock += sizeof(SSubmitBlk);
int32_t flen = 0; // original total length of row int32_t flen = 0; // original total length of row
for (int32_t i = 0; i < tinfo.numOfColumns; ++i) {
flen += TYPE_BYTES[pSchema[i].type]; // schema needs to be included into the submit data block
if (includeSchema) {
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock;
pCol->colId = pSchema[j].colId;
pCol->type = pSchema[j].type;
pCol->bytes = pSchema[j].bytes;
pCol->offset = 0;
pDataBlock += sizeof(STColumn);
flen += TYPE_BYTES[pSchema[j].type];
}
int32_t schemaSize = sizeof(STColumn) * numOfCols;
pBlock->schemaLen = schemaSize;
} else {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
pBlock->schemaLen = 0;
} }
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
pBlock->len = 0; pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows); int32_t numOfRows = htons(pBlock->numOfRows);
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
SDataRow trow = (SDataRow)pDataBlock; SDataRow trow = (SDataRow) pDataBlock;
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
dataRowSetVersion(trow, pTableMeta->sversion); dataRowSetVersion(trow, pTableMeta->sversion);
@ -595,20 +614,21 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
p += pSchema[j].bytes; p += pSchema[j].bytes;
} }
// p += pTableDataBlock->rowSize;
pDataBlock += dataRowLen(trow); pDataBlock += dataRowLen(trow);
pBlock->len += dataRowLen(trow); pBlock->dataLen += dataRowLen(trow);
} }
len = pBlock->len; int32_t len = pBlock->dataLen + pBlock->schemaLen;
pBlock->len = htonl(pBlock->len); pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
return len; return len;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
// the expanded size when a row data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
@ -617,7 +637,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
size_t total = taosArrayGetSize(pTableDataBlockList); size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) { for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = int32_t ret =
@ -666,16 +685,17 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion); pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows); pBlocks->numOfRows = htons(pBlocks->numOfRows);
pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock); int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema);
assert(finalLen <= len); assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize); assert(dataBuf->size <= dataBuf->nAllocSize);
// the length does not include the SSubmitBlk structure // the length does not include the SSubmitBlk structure
pBlocks->len = htonl(finalLen); pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
} }

View File

@ -128,10 +128,10 @@ extern float tsTotalLogDirGB;
extern float tsTotalTmpDirGB; extern float tsTotalTmpDirGB;
extern float tsTotalDataDirGB; extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB; extern float tsAvailLogDirGB;
extern float tsAvailTmpDirGB; extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB; extern float tsAvailDataDirGB;
extern float tsMinimalLogDirGB; extern float tsMinimalLogDirGB;
extern float tsMinimalTmpDirGB; extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB; extern float tsMinimalDataDirGB;
extern int32_t tsTotalMemoryMB; extern int32_t tsTotalMemoryMB;
extern int32_t tsVersion; extern int32_t tsVersion;

View File

@ -170,9 +170,9 @@ int64_t tsStreamMax;
int32_t tsNumOfCores = 1; int32_t tsNumOfCores = 1;
float tsTotalTmpDirGB = 0; float tsTotalTmpDirGB = 0;
float tsTotalDataDirGB = 0; float tsTotalDataDirGB = 0;
float tsAvailTmpDirGB = 0; float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0; float tsAvailDataDirGB = 0;
float tsMinimalTmpDirGB = 0.1; float tsReservedTmpDirectorySpace = 0.1;
float tsMinimalDataDirGB = 0.5; float tsMinimalDataDirGB = 0.5;
int32_t tsTotalMemoryMB = 0; int32_t tsTotalMemoryMB = 0;
int32_t tsVersion = 0; int32_t tsVersion = 0;
@ -807,7 +807,7 @@ static void doInitGlobalConfig() {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "minimalTmpDirGB"; cfg.option = "minimalTmpDirGB";
cfg.ptr = &tsMinimalTmpDirGB; cfg.ptr = &tsReservedTmpDirectorySpace;
cfg.valType = TAOS_CFG_VTYPE_FLOAT; cfg.valType = TAOS_CFG_VTYPE_FLOAT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0.001; cfg.minValue = 0.001;

View File

@ -283,7 +283,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
} }
tdAppendColVal(trow, val, c->type, c->bytes, c->offset); tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
} }
pBlk->len = htonl(dataRowLen(trow)); pBlk->dataLen = htonl(dataRowLen(trow));
pBlk->schemaLen = 0;
pBlk->uid = htobe64(pObj->uid); pBlk->uid = htobe64(pObj->uid);
pBlk->tid = htonl(pObj->tid); pBlk->tid = htonl(pObj->tid);

View File

@ -192,7 +192,8 @@ typedef struct SSubmitBlk {
int32_t tid; // table id int32_t tid; // table id
int32_t padding; // TODO just for padding here int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block int16_t numOfRows; // total number of rows in current submit block
char data[]; char data[];
} SSubmitBlk; } SSubmitBlk;

View File

@ -326,12 +326,12 @@ bool taosGetDisk() {
if (statvfs("/tmp", &info)) { if (statvfs("/tmp", &info)) {
//tsTotalTmpDirGB = 0; //tsTotalTmpDirGB = 0;
//tsAvailTmpDirGB = 0; //tsAvailTmpDirectorySpace = 0;
uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno)); uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno));
return false; return false;
} else { } else {
tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit); tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit);
tsAvailTmpDirGB = (float)((double)info.f_bavail * (double)info.f_frsize / unit); tsAvailTmpDirectorySpace = (float)((double)info.f_bavail * (double)info.f_frsize / unit);
} }
return true; return true;

View File

@ -161,12 +161,12 @@ typedef struct SQuery {
} SQuery; } SQuery;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage; int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; SFillInfo* pFillInfo;
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
STSBuf* pTSBuf; STSBuf* pTSBuf;
@ -176,7 +176,8 @@ typedef struct SQueryRuntimeEnv {
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool topBotQuery; // false; bool topBotQuery; // false
int32_t prevGroupId; // previous executed group id
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
typedef struct SQInfo { typedef struct SQInfo {

View File

@ -3305,13 +3305,16 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
* @param pRuntimeEnv * @param pRuntimeEnv
* @param pDataBlockInfo * @param pDataBlockInfo
*/ */
void setExecutionContext(SQInfo *pQInfo, void* pTable, int32_t groupIndex, TSKEY nextKey) { void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
int32_t GROUPRESULTID = 1;
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
return;
}
int32_t GROUPRESULTID = 1;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex)); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex));
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return; return;
@ -3328,6 +3331,8 @@ void setExecutionContext(SQInfo *pQInfo, void* pTable, int32_t groupIndex, TSKEY
} }
} }
// record the current active group id
pRuntimeEnv->prevGroupId = groupIndex;
setWindowResOutputBuf(pRuntimeEnv, pWindowRes); setWindowResOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
@ -4072,6 +4077,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
pRuntimeEnv->pTSBuf = param; pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->cur.vgroupIndex = -1;
pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
if (param != NULL) { if (param != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
@ -4176,8 +4182,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (!isIntervalQuery(pQuery)) { if (!isIntervalQuery(pQuery)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, (*pTableQueryInfo)->pTable, (*pTableQueryInfo)->groupIndex, setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
blockInfo.window.ekey + step);
} else { // interval query } else { // interval query
TSKEY nextKey = blockInfo.window.skey; TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey); setIntervalQueryRange(pQInfo, nextKey);
@ -4553,7 +4558,8 @@ static void doSaveContext(SQInfo *pQInfo) {
if (pRuntimeEnv->pSecQueryHandle != NULL) { if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
} }
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);

View File

@ -631,5 +631,5 @@ void exprSerializeTest2() {
} }
} // namespace } // namespace
TEST(testCase, astTest) { TEST(testCase, astTest) {
exprSerializeTest2(); // exprSerializeTest2();
} }

View File

@ -768,7 +768,8 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
SSubmitBlk *pBlock = pIter->pBlock; SSubmitBlk *pBlock = pIter->pBlock;
if (pBlock == NULL) return NULL; if (pBlock == NULL) return NULL;
pBlock->len = htonl(pBlock->len); pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows); pBlock->numOfRows = htons(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid); pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid); pBlock->tid = htonl(pBlock->tid);
@ -776,11 +777,11 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
pBlock->sversion = htonl(pBlock->sversion); pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding); pBlock->padding = htonl(pBlock->padding);
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len; pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen;
if (pIter->len >= pIter->totalLen) { if (pIter->len >= pIter->totalLen) {
pIter->pBlock = NULL; pIter->pBlock = NULL;
} else { } else {
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk)); pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk));
} }
return pBlock; return pBlock;
@ -832,10 +833,10 @@ _err:
} }
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->len <= 0) return -1; if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->len; pIter->totalLen = pBlock->dataLen;
pIter->len = 0; pIter->len = 0;
pIter->row = (SDataRow)(pBlock->data); pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
return 0; return 0;
} }