This commit is contained in:
slguan 2019-08-02 16:52:22 +08:00
parent 04504ce444
commit fe6672d79b
27 changed files with 1168 additions and 737 deletions

View File

@ -37,13 +37,13 @@ extern "C" {
struct SQLFunctionCtx;
typedef struct SLocalDataSrc {
typedef struct SLocalDataSource {
tExtMemBuffer *pMemBuffer;
int32_t flushoutIdx;
int32_t pageId;
int32_t rowIdx;
tFilePage filePage;
} SLocalDataSrc;
} SLocalDataSource;
enum {
TSC_LOCALREDUCE_READY = 0x0,
@ -52,7 +52,7 @@ enum {
};
typedef struct SLocalReducer {
SLocalDataSrc **pLocalDataSrc;
SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;

View File

@ -41,21 +41,24 @@ typedef struct SParsedColElem {
} SParsedColElem;
typedef struct SParsedDataColInfo {
bool ordered; // denote if the timestamp in one data block ordered or not
int16_t numOfCols;
int16_t numOfAssignedCols;
SParsedColElem elems[TSDB_MAX_COLUMNS];
bool hasVal[TSDB_MAX_COLUMNS];
int64_t prevTimestamp;
} SParsedDataColInfo;
SInsertedDataBlocks* tscCreateDataBlock(int32_t size);
void tscDestroyDataBlock(SInsertedDataBlocks** pDataBlock);
STableDataBlocks* tscCreateDataBlock(int32_t size);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
SDataBlockList* tscCreateBlockArrayList();
void tscDestroyBlockArrayList(SDataBlockList** pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, SInsertedDataBlocks* pDataBlock);
void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId);
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
@ -66,8 +69,7 @@ bool tscIsTwoStageMergeMetricQuery(SSqlObj* pSql);
/**
*
* for the projection query on metric or point interpolation query on metric,
* we iterate all the meters, instead of invoke query on all qualified meters
* simultaneously.
* we iterate all the meters, instead of invoke query on all qualified meters simultaneously.
*
* @param pSql sql object
* @return
@ -124,8 +126,7 @@ void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId);
// get starter position of metric query condition (query on tags) in
// SSqlCmd.payload
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
char* tsGetMetricQueryCondPos(STagCond* pCond);
void tscTagCondAssign(STagCond* pDst, STagCond* pSrc);
void tscTagCondRelease(STagCond* pCond);
@ -139,6 +140,7 @@ void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscDoQuery(SSqlObj* pSql);
int32_t sortRemoveDuplicates(STableDataBlocks* dataBuf, int32_t numOfRows);
#ifdef __cplusplus
}
#endif

View File

@ -169,16 +169,22 @@ typedef struct STagCond {
char * pData;
} STagCond;
typedef struct SInsertedDataBlocks {
char meterId[TSDB_METER_ID_LEN];
int64_t size;
uint32_t nAllocSize;
uint32_t numOfMeters;
typedef struct STableDataBlocks {
char meterId[TSDB_METER_ID_LEN];
int64_t vgid;
int64_t size;
int64_t prevTS;
bool ordered;
int32_t numOfMeters;
int32_t rowSize;
uint32_t nAllocSize;
union {
char *filename;
char *pData;
};
} SInsertedDataBlocks;
} STableDataBlocks;
typedef struct SDataBlockList {
int32_t idx;
@ -186,7 +192,7 @@ typedef struct SDataBlockList {
int32_t nAlloc;
char * userParam; /* user assigned parameters for async query */
void * udfp; /* user defined function pointer, used in async model */
SInsertedDataBlocks **pData;
STableDataBlocks **pData;
} SDataBlockList;
typedef struct {

View File

@ -410,7 +410,7 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
tscTrace("%p Async insertion completed, destroy data block list", pSql);
// release data block data
tscDestroyBlockArrayList(&pCmd->pDataBlocks);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// all data has been sent to vnode, call user function
(*pSql->fp)(pSql->param, tres, numOfRows);

File diff suppressed because it is too large Load Diff

View File

@ -140,12 +140,10 @@ tSQLExpr *tSQLExprIdValueCreate(SSQLToken *pToken, int32_t optrType) {
nodePtr->val.nType = TSDB_DATA_TYPE_BIGINT;
nodePtr->nSQLOptr = TK_TIMESTAMP;
} else { // must be field id if not numbers
if (pToken != NULL) {
assert(optrType == TK_ID);
/* it must be the column name (tk_id) */
assert(optrType == TK_ALL || optrType == TK_ID);
if (pToken != NULL) { // it must be the column name (tk_id)
nodePtr->colInfo = *pToken;
} else {
assert(optrType == TK_ALL);
}
nodePtr->nSQLOptr = optrType;

View File

@ -19,25 +19,26 @@
#include <stdlib.h>
#include "tlosertree.h"
#include "tsclient.h"
#include "tlosertree.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tutil.h"
typedef struct SCompareParam {
SLocalDataSrc ** pLocalData;
tOrderDescriptor *pDesc;
int32_t numOfElems;
int32_t groupOrderType;
SLocalDataSource **pLocalData;
tOrderDescriptor * pDesc;
int32_t numOfElems;
int32_t groupOrderType;
} SCompareParam;
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
SCompareParam * pParam = (SCompareParam *)param;
tOrderDescriptor *pDesc = pParam->pDesc;
SLocalDataSrc ** pLocalData = pParam->pLocalData;
SCompareParam * pParam = (SCompareParam *)param;
tOrderDescriptor * pDesc = pParam->pDesc;
SLocalDataSource **pLocalData = pParam->pLocalData;
/* this input is exhausted, set the special value to denote this */
if (pLocalData[pLeftIdx]->rowIdx == -1) {
@ -105,7 +106,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
}
/*
* todo error process with async process
* todo release allocated memory process with async process
*/
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
tColModel *finalmodel, SSqlCmd *pCmd, SSqlRes *pRes) {
@ -133,32 +134,32 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscTrace("%p retrieved no data", pSqlObjAddr);
return;
}
if (pDesc->pSchema->maxCapacity >= pMemBuffer[0]->nPageSize) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSqlObjAddr, pDesc->pSchema->maxCapacity,
pMemBuffer[0]->nPageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
pRes->code = TSDB_CODE_APP_ERROR;
return;
}
size_t nReducerSize = sizeof(SLocalReducer) + POINTER_BYTES * numOfFlush;
size_t nReducerSize = sizeof(SLocalReducer) + sizeof(void *) * numOfFlush;
SLocalReducer *pReducer = (SLocalReducer *)calloc(1, nReducerSize);
if (pReducer == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p failed to create merge structure", pSqlObjAddr);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return;
}
pReducer->pExtMemBuffer = pMemBuffer;
pReducer->pLocalDataSrc = (SLocalDataSrc **)&pReducer[1];
pReducer->pLocalDataSrc = (SLocalDataSource **)&pReducer[1];
assert(pReducer->pLocalDataSrc != NULL);
pReducer->numOfBuffer = numOfFlush;
@ -172,7 +173,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
int32_t numOfFlushoutInFile = pMemBuffer[i]->fileMeta.flushoutData.nLength;
for (int32_t j = 0; j < numOfFlushoutInFile; ++j) {
SLocalDataSrc *pDS = (SLocalDataSrc *)malloc(sizeof(SLocalDataSrc) + pMemBuffer[0]->nPageSize);
SLocalDataSource *pDS = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->nPageSize);
if (pDS == NULL) {
tscError("%p failed to create merge structure", pSqlObjAddr);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
@ -468,9 +469,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
if (pCmd->nAggTimeInterval != 0) {
/*
* the first column is the timestamp, handles queries like "interval(10m) group by tags"
*/
//the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
}
@ -485,29 +484,32 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
}
bool isSameGroupOfPrev(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpPage) {
bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) {
int16_t functionId = tscSqlExprGet(pCmd, 0)->sqlFuncId;
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query
return false; // disable merge procedure
// disable merge procedure for column projection query
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) {
return false;
}
tOrderDescriptor *pOrderDesc = pReducer->pDesc;
int32_t numOfCols = pOrderDesc->orderIdx.numOfOrderedCols;
if (numOfCols > 0) {
if (pOrderDesc->orderIdx.pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { //<= 0
/* metric interval query */
assert(pCmd->nAggTimeInterval > 0);
pOrderDesc->orderIdx.numOfOrderedCols -= 1;
} else { /* simple group by query */
assert(pCmd->nAggTimeInterval == 0);
}
} else {
// no group by columns, all data belongs to one group
if (numOfCols <= 0) {
return true;
}
if (pOrderDesc->orderIdx.pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { //<= 0
// super table interval query
assert(pCmd->nAggTimeInterval > 0);
pOrderDesc->orderIdx.numOfOrderedCols -= 1;
} else { // simple group by query
assert(pCmd->nAggTimeInterval == 0);
}
// only one row exists
int32_t ret = compare_a(pOrderDesc, 1, 0, pPrev, 1, 0, tmpPage->data);
int32_t ret = compare_a(pOrderDesc, 1, 0, pPrev, 1, 0, tmpBuffer->data);
pOrderDesc->orderIdx.numOfOrderedCols = numOfCols;
return (ret == 0);
@ -602,7 +604,7 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe
* @param treeList
* @return the number of remain input source. if ret == 0, all data has been handled
*/
int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSrc *pOneInterDataSrc,
int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSource *pOneInterDataSrc,
bool *needAdjustLoserTree) {
pOneInterDataSrc->rowIdx = 0;
pOneInterDataSrc->pageId += 1;
@ -629,8 +631,8 @@ int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSrc *pOne
return pLocalReducer->numOfBuffer;
}
void loadDataIntoMemAndAdjustLoserTree(SLocalReducer *pLocalReducer, SLocalDataSrc *pOneInterDataSrc,
SLoserTreeInfo *pTree) {
void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *pOneInterDataSrc,
SLoserTreeInfo *pTree) {
/*
* load a new data page into memory for intermediate dataset source,
* since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
@ -662,10 +664,10 @@ void loadDataIntoMemAndAdjustLoserTree(SLocalReducer *pLocalReducer, SLocalDataS
}
}
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SSqlCmd *pCmd,
SInterpolationInfo *pInterpoInfo) { // discard following dataset in the
// same group and reset the
// interpolation information
void savePrevRecordAndSetupInterpoInfo(
SLocalReducer *pLocalReducer, SSqlCmd *pCmd,
SInterpolationInfo
*pInterpoInfo) { // discard following dataset in the same group and reset the interpolation information
int64_t stime = (pCmd->stime < pCmd->etime) ? pCmd->stime : pCmd->etime;
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit);
@ -749,7 +751,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, pCmd->limit.offset - 1);
/* remove the hole in column model */
tColModelCompress(pLocalReducer->resColModel, pFinalDataPage, prevSize);
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
pRes->numOfRows -= pCmd->limit.offset;
pRes->numOfTotal -= pCmd->limit.offset;
@ -772,7 +774,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
pRes->numOfRows -= overFlow;
pFinalDataPage->numOfElems -= overFlow;
tColModelCompress(pLocalReducer->resColModel, pFinalDataPage, prevSize);
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pCmd, &pLocalReducer->interpolationInfo);
@ -892,21 +894,21 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
free(srcData);
}
static void savePrevRecord(SLocalReducer *pLocalReducer, tFilePage *tmpPages) {
static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
tColModel *pColModel = pLocalReducer->pDesc->pSchema;
assert(pColModel->maxCapacity == 1 && tmpPages->numOfElems == 1);
assert(pColModel->maxCapacity == 1 && tmpBuffer->numOfElems == 1);
// copy to previous temp buffer
for (int32_t i = 0; i < pLocalReducer->pDesc->pSchema->numOfCols; ++i) {
memcpy(pLocalReducer->prevRowOfInput + pColModel->colOffset[i], tmpPages->data + pColModel->colOffset[i],
memcpy(pLocalReducer->prevRowOfInput + pColModel->colOffset[i], tmpBuffer->data + pColModel->colOffset[i],
pColModel->pFields[i].bytes);
}
tmpPages->numOfElems = 0;
tmpBuffer->numOfElems = 0;
pLocalReducer->hasPrevRow = true;
}
static void handleUnprocessedRow(SLocalReducer *pLocalReducer, SSqlCmd *pCmd, tFilePage *tmpPages) {
static void handleUnprocessedRow(SLocalReducer *pLocalReducer, SSqlCmd *pCmd, tFilePage *tmpBuffer) {
if (pLocalReducer->hasUnprocessedRow) {
for (int32_t j = 0; j < pCmd->fieldsInfo.numOfOutputCols; ++j) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, j);
@ -922,7 +924,7 @@ static void handleUnprocessedRow(SLocalReducer *pLocalReducer, SSqlCmd *pCmd, tF
pLocalReducer->hasUnprocessedRow = false;
// copy to previous temp buffer
savePrevRecord(pLocalReducer, tmpPages);
savePreviousRow(pLocalReducer, tmpBuffer);
}
}
@ -1005,7 +1007,7 @@ int32_t finalizeRes(SSqlCmd *pCmd, SLocalReducer *pLocalReducer) {
* results generated by simple aggregation function, we merge them all into one points
* *Exception*: column projection query, required no merge procedure
*/
bool needToMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpPages) {
bool needToMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default
int16_t functionId = tscSqlExprGet(pCmd, 0)->sqlFuncId;
@ -1016,9 +1018,9 @@ bool needToMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpPage
if (pDesc->orderIdx.numOfOrderedCols > 0) {
if (pDesc->tsOrder == TSQL_SO_ASC) { // asc
// todo refactor comparator
ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpPages->data);
ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
} else { // desc
ret = compare_d(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpPages->data);
ret = compare_d(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
}
}
}
@ -1027,23 +1029,55 @@ bool needToMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpPage
return (ret == 0);
}
void savePreGroupNumOfRes(SSqlRes *pRes) {
// pRes->numOfGroups += 1;
// pRes->pGroupRec = realloc(pRes->pGroupRec,
// pRes->numOfGroups*sizeof(SResRec));
//
// pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows;
// pRes->pGroupRec[pRes->numOfGroups-1].numOfTotal = pRes->numOfTotal;
static bool reachGroupResultLimit(SSqlCmd *pCmd, SSqlRes *pRes) {
return (pRes->numOfGroups >= pCmd->glimit.limit && pCmd->glimit.limit >= 0);
}
void doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer,
bool doneOuput) { // there are merged results in buffer, flush to client
static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pRes->numOfGroups += 1;
// the output group is limited by the glimit clause
if (reachGroupResultLimit(pCmd, pRes)) {
return true;
}
// pRes->pGroupRec = realloc(pRes->pGroupRec, pRes->numOfGroups*sizeof(SResRec));
// pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows;
// pRes->pGroupRec[pRes->numOfGroups-1].numOfTotal = pRes->numOfTotal;
return false;
}
/**
*
* @param pSql
* @param pLocalReducer
* @param noMoreCurrentGroupRes
* @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
*/
bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage *pResBuf = pLocalReducer->pResultBuf;
tColModel *pModel = pLocalReducer->resColModel;
tColModelCompress(pModel, pResBuf, pModel->maxCapacity);
pRes->code = TSDB_CODE_SUCCESS;
/*
* ignore the output of the current group since this group is skipped by user
* We set the numOfRows to be 0 and discard the possible remain results.
*/
if (pCmd->glimit.offset > 0) {
pRes->numOfRows = 0;
pCmd->glimit.offset -= 1;
pLocalReducer->discard = !noMoreCurrentGroupRes;
return false;
}
tColModelCompact(pModel, pResBuf, pModel->maxCapacity);
memcpy(pLocalReducer->pBufForInterpo, pResBuf->data, pLocalReducer->nResultBufSize);
#ifdef _DEBUG_VIEW
@ -1061,9 +1095,9 @@ void doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer,
}
taosInterpoSetStartInfo(&pLocalReducer->interpolationInfo, pResBuf->numOfElems, pCmd->interpoType);
doInterpolateResult(pSql, pLocalReducer, doneOuput);
doInterpolateResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
pRes->code = TSDB_CODE_SUCCESS;
return true;
}
void resetOutputBuf(SSqlCmd *pCmd, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning
@ -1075,10 +1109,8 @@ void resetOutputBuf(SSqlCmd *pCmd, SLocalReducer *pLocalReducer) { // reset out
memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
}
static void setUpForNewGroupRes(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) {
/*
* In handling data in other groups, we need to reset the interpolation information for a new group data
*/
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) {
//In handling data in other groups, we need to reset the interpolation information for a new group data
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
pCmd->limit.offset = pLocalReducer->offset;
@ -1093,6 +1125,124 @@ static void setUpForNewGroupRes(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLo
}
}
static bool isAllSourcesCompleted(SLocalReducer *pLocalReducer) {
return (pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted);
}
static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SLocalReducer * pLocalReducer = pRes->pLocalReducer;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
if (taosHasRemainsDataForInterpolation(pInterpoInfo)) {
assert(pCmd->interpoType != TSDB_INTERPO_NONE);
tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf;
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1));
int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pCmd->nAggTimeInterval, ekey, pLocalReducer->resColModel->maxCapacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, false);
}
return true;
} else {
return false;
}
}
static bool doHandleLastRemainData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SLocalReducer * pLocalReducer = pRes->pLocalReducer;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
// if interpoType == TSDB_INTERPO_NONE, return directly
if (pCmd->interpoType != TSDB_INTERPO_NONE) {
int64_t etime = (pCmd->stime < pCmd->etime) ? pCmd->etime : pCmd->stime;
etime = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pCmd->nAggTimeInterval, etime,
pLocalReducer->resColModel->maxCapacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, true);
}
}
/*
* 1. numOfRows == 0, means no interpolation results are generated.
* 2. if all local data sources are consumed, and no un-processed rows exist.
*
* No results will be generated and query completed.
*/
if (pRes->numOfRows > 0 || (isAllSourcesCompleted(pLocalReducer) && (!pLocalReducer->hasUnprocessedRow))) {
return true;
}
// start to process result for a new group and save the result info of previous group
if (saveGroupResultInfo(pSql)) {
return true;
}
resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
}
return false;
}
static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
for (int32_t k = 0; k < pCmd->fieldsInfo.numOfOutputCols; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, k);
pLocalReducer->pCtx[k].aOutputBuf += pLocalReducer->pCtx[k].outputBytes * numOfRes;
// set the correct output timestamp column position
if (pExpr->sqlFuncId == TSDB_FUNC_TOP_DST || pExpr->sqlFuncId == TSDB_FUNC_BOTTOM_DST) {
pLocalReducer->pCtx[k].ptsOutputBuf = ((char *)pLocalReducer->pCtx[k].ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
}
/* set the parameters for the SQLFunctionCtx */
tVariantAssign(&pLocalReducer->pCtx[k].param[0], &pExpr->param[0]);
aAggs[pExpr->sqlFuncId].init(&pLocalReducer->pCtx[k]);
pLocalReducer->pCtx[k].currentStage = SECONDARY_STAGE_MERGE;
aAggs[pExpr->sqlFuncId].distSecondaryMergeFunc(&pLocalReducer->pCtx[k]);
}
}
static void doExecuteSecondaryMerge(SSqlObj *pSql) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
for (int32_t j = 0; j < pCmd->fieldsInfo.numOfOutputCols; ++j) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, j);
tVariantAssign(&pLocalReducer->pCtx[j].param[0], &pExpr->param[0]);
pLocalReducer->pCtx[j].numOfIteratedElems = 0;
pLocalReducer->pCtx[j].currentStage = 0;
aAggs[pExpr->sqlFuncId].init(&pLocalReducer->pCtx[j]);
pLocalReducer->pCtx[j].currentStage = SECONDARY_STAGE_MERGE;
aAggs[pExpr->sqlFuncId].distSecondaryMergeFunc(&pLocalReducer->pCtx[j]);
}
}
int32_t tscLocalDoReduce(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
@ -1111,82 +1261,35 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
// set the local reduce in progress
// set the data merge in progress
int32_t prevStatus =
__sync_val_compare_and_swap_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) {
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED);
/* it is in tscDestroyLocalReducer function already */
return 0;
return TSDB_CODE_SUCCESS;
}
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
tFilePage * tmpPages = pLocalReducer->pTempBuffer;
tFilePage *tmpBuffer = pLocalReducer->pTempBuffer;
bool prevGroupDone = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
if ((pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted && !pLocalReducer->hasPrevRow) ||
pLocalReducer->pLocalDataSrc[0] == NULL || prevGroupDone) {
/* if interpoType == TSDB_INTERPO_NONE, return directly */
if (pCmd->interpoType != TSDB_INTERPO_NONE) {
int64_t etime = (pCmd->stime < pCmd->etime) ? pCmd->etime : pCmd->stime;
etime = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pCmd->nAggTimeInterval, etime,
pLocalReducer->resColModel->maxCapacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, true);
}
}
/* numOfRows == 0, means no interpolation results are generated yet */
if (pRes->numOfRows == 0) {
/* local reduce is completed */
if ((pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted) && (!pLocalReducer->hasUnprocessedRow)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
// set the flag, taos_free_result can release this result.
return 0;
} else {
/* start for process result for a new group */
savePreGroupNumOfRes(pRes);
setUpForNewGroupRes(pRes, pCmd, pLocalReducer);
}
} else {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
// set the flag, taos_free_result can release this result.
return 0;
}
if (doHandleLastRemainData(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS;
}
if (taosHasNoneInterpoPoints(pInterpoInfo)) {
assert(pCmd->interpoType != TSDB_INTERPO_NONE);
tFilePage *pFinalDataPage = pLocalReducer->pResultBuf;
int64_t etime = *(int64_t *)(pFinalDataPage->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1));
int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pCmd->nAggTimeInterval, ekey, pLocalReducer->resColModel->maxCapacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, false);
}
pLocalReducer->status = TSC_LOCALREDUCE_READY;
// set the flag, taos_free_result can release this result.
return 0;
if (doInterpolationForCurrentGroup(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS;
}
SLoserTreeInfo *pTree = pLocalReducer->pLoserTree;
// clear buffer
handleUnprocessedRow(pLocalReducer, pCmd, tmpPages);
handleUnprocessedRow(pLocalReducer, pCmd, tmpBuffer);
tColModel *pModel = pLocalReducer->pDesc->pSchema;
while (1) {
_reduce_retrieve:
if (pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted) {
pRes->numOfRows = 0;
if (isAllSourcesCompleted(pLocalReducer)) {
break;
}
@ -1194,12 +1297,12 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
#endif
assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) &&
tmpPages->numOfElems == 0);
tmpBuffer->numOfElems == 0);
// chosen from loser tree
SLocalDataSrc *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index];
SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index];
tColModelAppend(pModel, tmpPages, pOneDataSrc->filePage.data, pOneDataSrc->rowIdx, 1,
tColModelAppend(pModel, tmpBuffer, pOneDataSrc->filePage.data, pOneDataSrc->rowIdx, 1,
pOneDataSrc->pMemBuffer->pColModel->maxCapacity);
#if defined(_DEBUG_VIEW)
@ -1207,35 +1310,42 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pCmd);
tColModelDisplayEx(pModel, tmpPages->data, tmpPages->numOfElems, pModel->maxCapacity, colInfo);
tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->numOfElems, pModel->maxCapacity, colInfo);
#endif
if (pLocalReducer->discard) {
assert(pLocalReducer->hasUnprocessedRow == false);
/* current record belongs to the same group of previous record, need to discard it */
if (isSameGroupOfPrev(pCmd, pLocalReducer, pLocalReducer->discardData->data, tmpPages)) {
tmpPages->numOfElems = 0;
if (isSameGroup(pCmd, pLocalReducer, pLocalReducer->discardData->data, tmpBuffer)) {
tmpBuffer->numOfElems = 0;
pOneDataSrc->rowIdx += 1;
loadDataIntoMemAndAdjustLoserTree(pLocalReducer, pOneDataSrc, pTree);
/* all inputs are exhausted, abort current process */
if (pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted) {
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
// all inputs are exhausted, abort current process
if (isAllSourcesCompleted(pLocalReducer)) {
break;
}
/* since it belongs to the same group, ignore following records */
// data belongs to the same group needs to be discarded
continue;
} else {
pLocalReducer->discard = false;
pLocalReducer->discardData->numOfElems = 0;
savePreGroupNumOfRes(pRes);
setUpForNewGroupRes(pRes, pCmd, pLocalReducer);
if (saveGroupResultInfo(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
return TSDB_CODE_SUCCESS;
}
resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
}
}
if (pLocalReducer->hasPrevRow) {
if (needToMerge(pCmd, pLocalReducer, tmpPages)) { // belong to the group of the previous row
if (needToMerge(pCmd, pLocalReducer, tmpBuffer)) {
// belong to the group of the previous row, continue process it
for (int32_t j = 0; j < pCmd->fieldsInfo.numOfOutputCols; ++j) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, j);
tVariantAssign(&pLocalReducer->pCtx[j].param[0], &pExpr->param[0]);
@ -1244,109 +1354,86 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
}
// copy to buffer
savePrevRecord(pLocalReducer, tmpPages);
} else { // reduce the previous is completed, start a new one
savePreviousRow(pLocalReducer, tmpBuffer);
} else {
/*
* current row does not belong to the group of previous row.
* so the processing of previous group is completed.
*/
int32_t numOfRes = finalizeRes(pCmd, pLocalReducer);
bool sameGroup = isSameGroupOfPrev(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpPages);
bool sameGroup = isSameGroup(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpBuffer);
tFilePage *pResBuf = pLocalReducer->pResultBuf;
/*
* if the previous group does NOTE generate any result
* (pResBuf->numOfElems == 0),
* if the previous group does NOT generate any result (pResBuf->numOfElems == 0),
* continue to process results instead of return results.
*/
if ((!sameGroup && pResBuf->numOfElems > 0) ||
(pResBuf->numOfElems == pLocalReducer->resColModel->maxCapacity)) {
// does not belong to the same group
assert(pResBuf->numOfElems > 0);
doGenerateFinalResults(pSql, pLocalReducer, !sameGroup);
bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup);
// this row needs to discard, since it belongs to the group of previous
if (pLocalReducer->discard && sameGroup) {
/* this row needs to discard, since it belongs to the group of previous */
pLocalReducer->hasUnprocessedRow = false;
tmpPages->numOfElems = 0;
tmpBuffer->numOfElems = 0;
} else {
// current row does not belongs to the previous group, so it is not be handled yet.
pLocalReducer->hasUnprocessedRow = true;
}
resetOutputBuf(pCmd, pLocalReducer);
pOneDataSrc->rowIdx += 1;
/* here we do not check the return value */
loadDataIntoMemAndAdjustLoserTree(pLocalReducer, pOneDataSrc, pTree);
// here we do not check the return value
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS);
if (pRes->numOfRows == 0) {
handleUnprocessedRow(pLocalReducer, pCmd, tmpPages);
handleUnprocessedRow(pLocalReducer, pCmd, tmpBuffer);
if (!sameGroup) {
/* previous group is done, we start a new one by continuing to
* retrieve data */
savePreGroupNumOfRes(pRes);
setUpForNewGroupRes(pRes, pCmd, pLocalReducer);
}
/*
* previous group is done, prepare for the next group
* If previous group is not skipped, keep it in pRes->numOfGroups
*/
if (notSkipped && saveGroupResultInfo(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
return TSDB_CODE_SUCCESS;
}
goto _reduce_retrieve;
resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
}
} else {
/*
* if next record belongs to a new group, we do not handle this record here.
* We start the process in a new round.
*/
if (sameGroup) {
handleUnprocessedRow(pLocalReducer, pCmd, tmpPages);
handleUnprocessedRow(pLocalReducer, pCmd, tmpBuffer);
}
}
pLocalReducer->status = TSC_LOCALREDUCE_READY;
// set the flag, taos_free_result can release this result.
return 0;
// current group has no result,
if (pRes->numOfRows == 0) {
continue;
} else {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS;
}
} else { // result buffer is not full
for (int32_t k = 0; k < pCmd->fieldsInfo.numOfOutputCols; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, k);
pLocalReducer->pCtx[k].aOutputBuf += pLocalReducer->pCtx[k].outputBytes * numOfRes;
if (pExpr->sqlFuncId == TSDB_FUNC_TOP_DST || pExpr->sqlFuncId == TSDB_FUNC_BOTTOM_DST) {
pLocalReducer->pCtx[k].ptsOutputBuf =
((char *)pLocalReducer->pCtx[k].ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
}
/* set the parameters for the SQLFunctionCtx */
tVariantAssign(&pLocalReducer->pCtx[k].param[0], &pExpr->param[0]);
aAggs[pExpr->sqlFuncId].init(&pLocalReducer->pCtx[k]);
pLocalReducer->pCtx[k].currentStage = SECONDARY_STAGE_MERGE;
aAggs[pExpr->sqlFuncId].distSecondaryMergeFunc(&pLocalReducer->pCtx[k]);
}
savePrevRecord(pLocalReducer, tmpPages);
doMergeWithPrevRows(pSql, numOfRes);
savePreviousRow(pLocalReducer, tmpBuffer);
}
}
} else { // put to previous input row for comparision
for (int32_t j = 0; j < pCmd->fieldsInfo.numOfOutputCols; ++j) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, j);
tVariantAssign(&pLocalReducer->pCtx[j].param[0], &pExpr->param[0]);
pLocalReducer->pCtx[j].numOfIteratedElems = 0;
pLocalReducer->pCtx[j].currentStage = 0;
aAggs[pExpr->sqlFuncId].init(&pLocalReducer->pCtx[j]);
pLocalReducer->pCtx[j].currentStage = SECONDARY_STAGE_MERGE;
aAggs[pExpr->sqlFuncId].distSecondaryMergeFunc(&pLocalReducer->pCtx[j]);
}
// copy to buffer
savePrevRecord(pLocalReducer, tmpPages);
} else {
doExecuteSecondaryMerge(pSql);
savePreviousRow(pLocalReducer, tmpBuffer); // copy the processed row to buffer
}
pOneDataSrc->rowIdx += 1;
loadDataIntoMemAndAdjustLoserTree(pLocalReducer, pOneDataSrc, pTree);
if (pLocalReducer->numOfCompleted == pLocalReducer->numOfBuffer) {
break;
}
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
}
if (pLocalReducer->hasPrevRow) {
@ -1358,8 +1445,7 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
}
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0);
pLocalReducer->status = TSC_LOCALREDUCE_READY;
// set the flag, taos_free_result can release this result.
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS;
}
@ -1378,7 +1464,8 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->pLocalReducer = (SLocalReducer *)calloc(1, sizeof(SLocalReducer));
/*
* we need one additional byte space the sprintf function needs one additional space to put '\0' at the end of string
* we need one additional byte space
* the sprintf function needs one additional space to put '\0' at the end of string
*/
size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1;
pRes->pLocalReducer->pResultBuf = (tFilePage *)calloc(1, allocSize);

View File

@ -358,14 +358,17 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
pRes->code = TSDB_CODE_SUCCESS;
}
tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
pRes->numOfRows += *(int32_t *)pRes->pRsp;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
*(int32_t *)pRes->pRsp, pRes->rspLen);
} else {
tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
}
}
@ -421,7 +424,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
return ahandle;
}
static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* pOld);
static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* prevSqlObj);
static int tscLaunchMetricSubQueries(SSqlObj *pSql);
int tscProcessSql(SSqlObj *pSql) {
@ -430,12 +433,6 @@ int tscProcessSql(SSqlObj *pSql) {
tscTrace("%p SQL cmd:%d will be processed, name:%s", pSql, pSql->cmd.command, pSql->cmd.name);
// whether don't judge 'isInsertFromFile' ?
if (pSql->cmd.command == TSDB_SQL_INSERT && pCmd->isInsertFromFile == 1) {
// pCmd->isInsertFromFile = 0; // lihui: can not clear the flag
return 0;
}
pSql->retry = 0;
if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->maxRetry = 2;
@ -595,7 +592,6 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, pNew->cmd.vnodeIdx);
tscProcessSql(pNew);
}
@ -665,7 +661,6 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d",
pPObj, pSql, idx, *trsupport->code);
} else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && *(trsupport->code) == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available count,
@ -675,11 +670,12 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
// clear local saved number of results
trsupport->localBuffer->numOfElems = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d, new SqlObj:%p",
trsupport->pParentSqlObj, pSql, numOfRows, idx, trsupport->numOfRetry, pNew);
trsupport->pParentSqlObj, pSql, numOfRows, idx, trsupport->numOfRetry, pNew);
tscProcessSql(pNew);
return;
@ -689,7 +685,6 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d",
pPObj, pSql, numOfRows, idx, *trsupport->code);
}
}
if (__sync_add_and_fetch_32(trsupport->numOfFinished, 1) < trsupport->numOfVnodes) {
@ -778,7 +773,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d",
pPObj, pSql, pSvd->ip, pSvd->vnode, numOfRowsFromVnode, idx);
tColModelCompress(pDesc->pSchema, trsupport->localBuffer, pDesc->pSchema->maxCapacity);
tColModelCompact(pDesc->pSchema, trsupport->localBuffer, pDesc->pSchema->maxCapacity);
#ifdef _DEBUG_VIEW
printf("%ld rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
@ -877,7 +872,7 @@ void tscKillMetricQuery(SSqlObj *pSql) {
tscTrace("%p metric query is cancelled", pSql);
}
static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* prevSqlObj) {
SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* prevSqlObj) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlObj *pNew = (SSqlObj *)calloc(1, sizeof(SSqlObj));
@ -2264,8 +2259,6 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) {
SSqlGroupbyExpr *pGroupby = &pCmd->groupbyExpr;
pMetaMsg->limit = htobe64(pCmd->glimit.limit);
pMetaMsg->offset = htobe64(pCmd->glimit.offset);
pMetaMsg->numOfTags = htons(pCmd->numOfReqTags);
pMetaMsg->numOfGroupbyCols = htons(pGroupby->numOfGroupbyCols);
@ -2750,7 +2743,6 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId) {
} else {
pNew->fp = tscMeterMetaCallBack;
pNew->param = pSql;
pNew->sqlstr = strdup(pSql->sqlstr);
code = tscProcessSql(pNew);

View File

@ -17,6 +17,7 @@
#include <math.h>
#include <time.h>
#include "ihash.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tkey.h"
@ -31,9 +32,10 @@
/*
* the detailed information regarding metric meta key is:
* fullmetername + '.' + querycond + '.' + [tagId1, tagId2,...] + '.' + group_orderType + '.' + limit + '.' + offset
* fullmetername + '.' + querycond + '.' + [tagId1, tagId2,...] + '.' + group_orderType
*
* if querycond is null, its format is:
* fullmetername + '.' + '(nil)' + '.' + [tagId1, tagId2,...] + '.' + group_orderType + '.' + limit + '.' + offset
* fullmetername + '.' + '(nil)' + '.' + [tagId1, tagId2,...] + '.' + group_orderType
*/
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr) {
char* pTagCondStr = NULL;
@ -60,8 +62,7 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr) {
pTagCondStr = strdup(tsGetMetricQueryCondPos(&pCmd->tagCond));
}
int32_t keyLen = sprintf(keyStr, "%s.%s.[%s].%d.%lld.%lld", pCmd->name, pTagCondStr, tagIdBuf,
pCmd->groupbyExpr.orderType, pCmd->glimit.limit, pCmd->glimit.offset);
int32_t keyLen = sprintf(keyStr, "%s.%s.[%s].%d", pCmd->name, pTagCondStr, tagIdBuf, pCmd->groupbyExpr.orderType);
free(pTagCondStr);
assert(keyLen <= TSDB_MAX_TAGS_LEN);
@ -142,8 +143,7 @@ bool tscProjectionQueryOnMetric(SSqlObj* pSql) {
/*
* In following cases, return false for project query on metric
* 1. failed to get metermeta from server; 2. not a metric; 3. limit 0; 4.
* show query, instead of a select query
* 1. failed to get metermeta from server; 2. not a metric; 3. limit 0; 4. show query, instead of a select query
*/
if (pCmd->pMeterMeta == NULL || !UTIL_METER_IS_METRIC(pCmd) || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->exprsInfo.numOfExprs == 0) {
@ -252,7 +252,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
}
void tscfreeSqlCmdData(SSqlCmd* pCmd) {
tscDestroyBlockArrayList(&pCmd->pDataBlocks);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscTagCondRelease(&pCmd->tagCond);
tscClearFieldInfo(pCmd);
@ -334,20 +334,22 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql);
}
SInsertedDataBlocks* tscCreateDataBlock(int32_t size) {
SInsertedDataBlocks* dataBuf = (SInsertedDataBlocks*)calloc(1, sizeof(SInsertedDataBlocks));
dataBuf->nAllocSize = (uint32_t) size;
STableDataBlocks* tscCreateDataBlock(int32_t size) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
dataBuf->nAllocSize = (uint32_t)size;
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
return dataBuf;
}
void tscDestroyDataBlock(SInsertedDataBlocks** pDataBlock) {
if (*pDataBlock == NULL) {
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
tfree((*pDataBlock)->pData);
tfree(*pDataBlock);
tfree(pDataBlock->pData);
tfree(pDataBlock);
}
SDataBlockList* tscCreateBlockArrayList() {
@ -360,29 +362,31 @@ SDataBlockList* tscCreateBlockArrayList() {
return pDataBlockArrayList;
}
void tscDestroyBlockArrayList(SDataBlockList** pList) {
if (*pList == NULL) {
return;
void* tscDestroyBlockArrayList(SDataBlockList* pList) {
if (pList == NULL) {
return NULL;
}
for (int32_t i = 0; i < (*pList)->nSize; i++) {
tscDestroyDataBlock(&(*pList)->pData[i]);
for (int32_t i = 0; i < pList->nSize; i++) {
tscDestroyDataBlock(pList->pData[i]);
}
tfree((*pList)->pData);
tfree(*pList);
tfree(pList->pData);
tfree(pList);
return NULL;
}
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, SInsertedDataBlocks* pDataBlock) {
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd;
pCmd->count = pDataBlock->numOfMeters;
strcpy(pCmd->name, pDataBlock->meterId);
strncpy(pCmd->name, pDataBlock->meterId, TSDB_METER_ID_LEN);
tscAllocPayloadWithSize(pCmd, pDataBlock->nAllocSize);
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize);
/* set the message length */
// set the message length
pCmd->payloadLen = pDataBlock->nAllocSize;
return tscGetMeterMeta(pSql, pCmd->name);
}
@ -390,12 +394,89 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, SInsertedDataBlocks* pDataBlock
void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
/* release additional memory consumption */
for (int32_t i = 0; i < pList->nSize; ++i) {
SInsertedDataBlocks* pDataBlock = pList->pData[i];
pDataBlock->pData = realloc(pDataBlock->pData, (size_t) pDataBlock->size);
pDataBlock->nAllocSize = (uint32_t) pDataBlock->size;
STableDataBlocks* pDataBlock = pList->pData[i];
pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
}
}
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) {
STableDataBlocks *dataBuf = tscCreateDataBlock(size);
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN);
return dataBuf;
}
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId) {
STableDataBlocks* dataBuf = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id);
if (t1 != NULL) {
dataBuf = *t1;
}
if (dataBuf == NULL) {
dataBuf = tscCreateDataBlockEx((size_t) size, rowSize, startOffset, tableId);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf);
tscAppendDataBlock(pDataBlockList, dataBuf);
}
return dataBuf;
}
void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pTableDataBlockList) {
void* pVnodeDataBlockHashList = taosInitIntHash(8, sizeof(void*), taosHashInt);
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
STableDataBlocks* dataBuf =
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->meterId);
int64_t destSize = dataBuf->size + pOneTableBlock->size;
if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
}
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
if (tmp != NULL) {
dataBuf->pData = tmp;
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
} else {
// to do handle error
}
}
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData;
assert(pBlocks->numOfRows * pOneTableBlock->rowSize + sizeof(SShellSubmitBlock) == pOneTableBlock->size);
pBlocks->numOfRows = (int16_t)sortRemoveDuplicates(pOneTableBlock, pBlocks->numOfRows);
pBlocks->sid = htonl(pBlocks->sid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
memcpy(dataBuf->pData + dataBuf->size, pOneTableBlock->pData, pOneTableBlock->size);
dataBuf->size += pOneTableBlock->size;
dataBuf->numOfMeters += 1;
}
tscDestroyBlockArrayList(pTableDataBlockList);
// free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList;
tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosCleanUpIntHash(pVnodeDataBlockHashList);
}
void tscCloseTscObj(STscObj* pObj) {
pObj->signature = NULL;
SSqlObj* pSql = pObj->pSql;
@ -821,15 +902,18 @@ int32_t tscValidateName(SSQLToken* pToken) {
pToken->n = strdequote(pToken->z);
strtrim(pToken->z);
pToken->n = (uint32_t)strlen(pToken->z);
int len = tSQLGetToken(pToken->z, &pToken->type);
int len = tSQLGetToken(pToken->z, &pToken->type);
// single token, validate it
if (len == pToken->n){
return validateQuoteToken(pToken);
}
else {
} else {
sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n);
if (sep == NULL) {
return TSDB_CODE_INVALID_SQL;
}
return tscValidateName(pToken);
}
} else {
@ -965,8 +1049,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
SSqlObj* pHeatBeat = pObj->pHb;
assert(pHeatBeat == pHeatBeat->signature);
pHeatBeat->cmd.type = 1; // to denote the heart-beat timer close connection
// and free all allocated resources
pHeatBeat->cmd.type = 1; // to denote the heart-beat timer close connection and free all allocated resources
}
bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
@ -1052,7 +1135,6 @@ void tscDoQuery(SSqlObj* pSql) {
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else {
// add to sql list, so that the show queries could get the query info
if (pCmd->command == TSDB_SQL_SELECT) {
tscAddIntoSqlList(pSql);
}
@ -1061,18 +1143,19 @@ void tscDoQuery(SSqlObj* pSql) {
pSql->cmd.vnodeIdx += 1;
}
if (pSql->fp == NULL) {
if (0 == pCmd->isInsertFromFile) {
tscProcessSql(pSql);
tscProcessMultiVnodesInsert(pSql); // handle the multi-vnode insertion
} else if (1 == pCmd->isInsertFromFile) {
tscProcessMultiVnodesInsertForFile(pSql);
} else {
assert(false);
}
} else {
tscProcessSql(pSql);
}
void* fp = pSql->fp;
if (pCmd->isInsertFromFile == 1) {
tscProcessMultiVnodesInsertForFile(pSql);
} else {
// pSql may be released in this function if it is a async insertion.
tscProcessSql(pSql);
// handle the multi-vnode insertion for sync model
if (fp == NULL) {
assert(pSql->signature == pSql);
tscProcessMultiVnodesInsert(pSql);
}
}
}
}

View File

@ -184,7 +184,7 @@ void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32
/*
* compress data into consecutive block without hole in data
*/
void tColModelCompress(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);
void tColModelCompact(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);
void tColModelErase(tColModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e);

View File

@ -69,7 +69,7 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo *pInterpoInfo, int64_t *p
* @param pInterpoInfo
* @return
*/
bool taosHasNoneInterpoPoints(SInterpolationInfo *pInterpoInfo);
bool taosHasRemainsDataForInterpolation(SInterpolationInfo *pInterpoInfo);
int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo);

View File

@ -69,11 +69,12 @@ enum _sync_cmd {
};
enum _meter_state {
TSDB_METER_STATE_READY,
TSDB_METER_STATE_IMPORTING,
TSDB_METER_STATE_UPDATING,
TSDB_METER_STATE_DELETING,
TSDB_METER_STATE_DELETED,
TSDB_METER_STATE_READY = 0x00,
TSDB_METER_STATE_INSERT = 0x01,
TSDB_METER_STATE_IMPORTING = 0x02,
TSDB_METER_STATE_UPDATING = 0x04,
TSDB_METER_STATE_DELETING = 0x10,
TSDB_METER_STATE_DELETED = 0x18,
};
typedef struct {
@ -184,10 +185,10 @@ typedef struct _meter_obj {
short sqlLen;
char searchAlgorithm : 4;
char compAlgorithm : 4;
char state : 5; // deleted or added, 1: added
char status : 3; // 0: ok, 1: stop stream computing
char status; // 0: ok, 1: stop stream computing
char reserved[16];
int state;
int numOfQueries;
char * pSql;
void * pStream;
@ -499,7 +500,7 @@ int vnodeInitStore();
void vnodeCleanUpVnodes();
void vnodeRemoveVnode(int vnode);
int vnodeRemoveVnode(int vnode);
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc);

View File

@ -75,6 +75,12 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSid
void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc);
int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state);
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeSetMeterDeleting(SMeterObj* pMeterObj);
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid);
#ifdef __cplusplus
}
#endif

View File

@ -445,7 +445,8 @@ int vnodeProcessFreeVnodeRequest(char *pMsg) {
}
dTrace("vid:%d receive free vnode message", pFree->vnode);
vnodeRemoveVnode(pFree->vnode);
int32_t code = vnodeRemoveVnode(pFree->vnode);
assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS);
pStart = (char *)malloc(128);
if (pStart == NULL) return 0;
@ -453,7 +454,7 @@ int vnodeProcessFreeVnodeRequest(char *pMsg) {
*pStart = TSDB_MSG_TYPE_FREE_VNODE_RSP;
pMsg = pStart + 1;
*pMsg = 0;
*pMsg = code;
vnodeSendMsgToMgmt(pStart);
return 0;

View File

@ -1140,54 +1140,13 @@ static void mgmtReorganizeMetersInMetricMeta(STabObj *pMetric, SMetricMetaMsg *p
startPos[1] = (int32_t)pRes->num;
}
/* if pInfo->limit == 0, the query will be intercepted by sdk, and wont be
* sent to mnode */
assert(pInfo->limit == -1 || pInfo->limit > 0);
int32_t numOfTotal = 0;
if (pInfo->offset >= numOfSubset) {
numOfTotal = 0;
} else if (numOfSubset == 1) {
// no 'groupBy' clause, all tables returned
numOfTotal = pRes->num;
} else {
/* there is a offset value of group */
int32_t start = 0;
int32_t end = 0;
if (pInfo->orderType == TSQL_SO_ASC) {
start = startPos[pInfo->offset];
if (pInfo->limit + pInfo->offset >= numOfSubset || pInfo->limit == -1) {
/* all results are required */
end = startPos[numOfSubset];
} else {
end = startPos[pInfo->limit + pInfo->offset];
}
} else {
end = startPos[numOfSubset - pInfo->offset];
if (pInfo->limit + pInfo->offset >= numOfSubset || pInfo->limit == -1) {
start = startPos[0];
} else {
start = startPos[numOfSubset - pInfo->limit - pInfo->offset];
}
}
numOfTotal = end - start;
assert(numOfTotal > 0);
memmove(pRes->pRes, pRes->pRes + start, numOfTotal * POINTER_BYTES);
}
/*
* sort the result according to vgid to ensure meters with the same vgid is
* continuous in the result list
*/
__compar_fn_t functor = (pRes->nodeType == TAST_NODE_TYPE_METER_PTR) ? tabObjVGIDComparator : nodeVGIDComparator;
qsort(pRes->pRes, numOfTotal, POINTER_BYTES, functor);
qsort(pRes->pRes, (size_t) pRes->num, POINTER_BYTES, functor);
pRes->num = numOfTotal;
free(descriptor->pTagSchema);
free(descriptor);
free(startPos);

View File

@ -340,19 +340,33 @@ void vnodeCommitOver(SVnodeObj *pVnode) {
pthread_mutex_unlock(&pPool->vmutex);
}
static void vnodeWaitForCommitComplete(SVnodeObj *pVnode) {
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
// wait for 100s at most
const int32_t totalCount = 1000;
int32_t count = 0;
// all meter is marked as dropped, so the commit will abort very quickly
while(count++ < totalCount) {
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
commitInProcess = pPool->commitInProcess;
pthread_mutex_unlock(&pPool->vmutex);
if (commitInProcess) {
dWarn("vid:%d still in commit, wait for completed", pVnode->vnode);
taosMsleep(10);
}
}
}
void vnodeCancelCommit(SVnodeObj *pVnode) {
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
if (pPool == NULL) return;
pthread_mutex_lock(&pPool->vmutex);
if (pPool->commitInProcess) {
pPool->commitInProcess = 0;
pthread_cancel(pVnode->commitThread);
}
pthread_mutex_unlock(&pPool->vmutex);
vnodeWaitForCommitComplete(pVnode);
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
}

View File

@ -26,6 +26,7 @@
#include "tsdb.h"
#include "vnode.h"
#include "vnodeUtil.h"
typedef struct {
int sversion;
@ -160,13 +161,17 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
if (*(int *)(cont+head.contLen) != simpleCheck) break;
SMeterObj *pObj = pVnode->meterList[head.sid];
if (pObj == NULL) {
dError(
"vid:%d, sid:%d not exists, ignore data in commit log, "
"contLen:%d action:%d",
dError("vid:%d, sid:%d not exists, ignore data in commit log, contLen:%d action:%d",
vnode, head.sid, head.contLen, head.action);
continue;
}
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, ignore data in commit log, contLen:%d action:%d",
vnode, head.sid, head.contLen, head.action);
continue;
}
int32_t numOfPoints = 0;
(*vnodeProcessAction[head.action])(pObj, cont, head.contLen, TSDB_DATA_SOURCE_LOG, NULL, head.sversion,
&numOfPoints);

View File

@ -577,8 +577,20 @@ _again:
// read compInfo
for (sid = 0; sid < pCfg->maxSessions; ++sid) {
if (pVnode->meterList == NULL) { // vnode is being freed, abort
goto _over;
}
pObj = (SMeterObj *)(pVnode->meterList[sid]);
if (pObj == NULL) continue;
if (pObj == NULL) {
continue;
}
// meter is going to be deleted, abort
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dWarn("vid:%d sid:%d is dropped, ignore this meter", vnode, sid);
continue;
}
pMeter = meterInfo + sid;
pHeader = ((SCompHeader *)tmem) + sid;
@ -672,8 +684,9 @@ _again:
pointsReadLast = pMeter->lastBlock.numOfPoints;
query.over = 0;
headInfo.totalStorage -= (pointsReadLast * pObj->bytesPerPoint);
dTrace("vid:%d sid:%d id:%s, points:%d in last block will be merged to new block",
pObj->vnode, pObj->sid, pObj->meterId, pointsReadLast);
pObj->vnode, pObj->sid, pObj->meterId, pointsReadLast);
}
pMeter->changed = 1;
@ -717,8 +730,8 @@ _again:
}
dTrace("vid:%d sid:%d id:%s, %d points are committed, lastKey:%lld slot:%d pos:%d newNumOfBlocks:%d",
pObj->vnode, pObj->sid, pObj->meterId, pMeter->committedPoints, pObj->lastKeyOnFile, query.slot, query.pos,
pMeter->newNumOfBlocks);
pObj->vnode, pObj->sid, pObj->meterId, pMeter->committedPoints, pObj->lastKeyOnFile, query.slot, query.pos,
pMeter->newNumOfBlocks);
if (pMeter->committedPoints > 0) {
pMeter->commitSlot = query.slot;

View File

@ -24,6 +24,7 @@
#include "vnode.h"
#include "vnodeMgmt.h"
#include "vnodeShell.h"
#include "vnodeShell.h"
#include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wpointer-sign"
#pragma GCC diagnostic ignored "-Wint-conversion"
@ -281,14 +282,32 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
SShellObj * pShell = pImport->pShell;
pImport->retry++;
pObj->state = TSDB_METER_STATE_IMPORTING;
//slow query will block the import operation
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
return;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
//if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
if (pPool->commitInProcess || pObj->numOfQueries > 0) {
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) {
pthread_mutex_unlock(&pPool->vmutex);
pObj->state = TSDB_METER_STATE_READY;
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (pImport->retry < 1000) {
dTrace("vid:%d sid:%d id:%s, commit in process, try to import later", pObj->vnode, pObj->sid, pObj->meterId);
dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
commitInProcess, num, state);
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl);
return;
} else {
@ -304,7 +323,8 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
}
}
pObj->state = TSDB_METER_STATE_READY;
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++;
// send response back to shell
@ -862,15 +882,19 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported);
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += pointsImported;
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else {
SImportInfo *pNew, import;
pObj->state = TSDB_METER_STATE_IMPORTING;
dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows);
memset(&import, 0, sizeof(import));
import.firstKey = *((TSKEY *)(payload));
@ -880,10 +904,19 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
import.payload = payload;
import.rows = rows;
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
if (pPool->commitInProcess || pObj->numOfQueries > 0) {
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
pthread_mutex_unlock(&pPool->vmutex);
pObj->state = TSDB_METER_STATE_READY;
//restore meter state
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo));
@ -892,8 +925,9 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pNew->payload = malloc(payloadLen);
memcpy(pNew->payload, payload, payloadLen);
dTrace("vid:%d sid:%d id:%s, commit/query:%d in process, import later, ", pObj->vnode, pObj->sid, pObj->meterId,
pObj->numOfQueries);
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->meterId, commitInProcess, pObj->numOfQueries);
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0;
} else {
@ -905,9 +939,10 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pShell->numOfTotalPoints += import.importedRows;
}
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
}
pObj->state = TSDB_METER_STATE_READY;
pVnode->version++;
if (pShell) {
@ -918,6 +953,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
return 0;
}
//todo abort from the procedure if the meter is going to be dropped
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
int code = 0;

View File

@ -47,6 +47,8 @@ void vnodeFreeMeterObj(SMeterObj *pObj) {
if (vnodeList[pObj->vnode].meterList != NULL) {
vnodeList[pObj->vnode].meterList[pObj->sid] = NULL;
}
memset(pObj->meterId, 0, tListLen(pObj->meterId));
tfree(pObj);
}
@ -143,7 +145,7 @@ int vnodeSaveMeterObjToFile(SMeterObj *pObj) {
memcpy(buffer, pObj, offsetof(SMeterObj, reserved));
memcpy(buffer + offsetof(SMeterObj, reserved), pObj->schema, pObj->numOfColumns * sizeof(SColumn));
memcpy(buffer + offsetof(SMeterObj, reserved) + pObj->numOfColumns * sizeof(SColumn), pObj->pSql, pObj->sqlLen);
taosCalcChecksumAppend(0, buffer, new_length);
taosCalcChecksumAppend(0, (uint8_t *)buffer, new_length);
if (offset == 0 || length < new_length) { // New, append to file end
fseek(fp, 0, SEEK_END);
@ -208,7 +210,7 @@ int vnodeSaveAllMeterObjToFile(int vnode) {
memcpy(buffer, pObj, offsetof(SMeterObj, reserved));
memcpy(buffer + offsetof(SMeterObj, reserved), pObj->schema, pObj->numOfColumns * sizeof(SColumn));
memcpy(buffer + offsetof(SMeterObj, reserved) + pObj->numOfColumns * sizeof(SColumn), pObj->pSql, pObj->sqlLen);
taosCalcChecksumAppend(0, buffer, new_length);
taosCalcChecksumAppend(0, (uint8_t *)buffer, new_length);
if (offset == 0 || length > new_length) { // New, append to file end
new_offset = fseek(fp, 0, SEEK_END);
@ -391,7 +393,7 @@ int vnodeOpenMetersVnode(int vnode) {
fseek(fp, offset, SEEK_SET);
if (fread(buffer, length, 1, fp) <= 0) break;
if (taosCheckChecksumWhole(buffer, length)) {
if (taosCheckChecksumWhole((uint8_t *)buffer, length)) {
vnodeRestoreMeterObj(buffer, length - sizeof(TSCKSUM));
} else {
dError("meter object file is broken since checksum mismatch, vnode: %d sid: %d, try to recover", vnode, sid);
@ -440,7 +442,7 @@ int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec) {
}
dTrace("vid:%d sid:%d id:%s, update schema", pNew->vnode, pNew->sid, pNew->meterId);
if (pObj->state != TSDB_METER_STATE_UPDATING) vnodeUpdateMeter(pNew, NULL);
if (!vnodeIsMeterState(pObj, TSDB_METER_STATE_UPDATING)) vnodeUpdateMeter(pNew, NULL);
return TSDB_CODE_SUCCESS;
}
@ -483,27 +485,20 @@ int vnodeRemoveMeterObj(int vnode, int sid) {
if (vnodeList[vnode].meterList == NULL) return 0;
pObj = vnodeList[vnode].meterList[sid];
if ((pObj == NULL) || (pObj->state == TSDB_METER_STATE_DELETED)) return 0;
if (pObj->state == TSDB_METER_STATE_IMPORTING) return TSDB_CODE_ACTION_IN_PROGRESS;
int32_t retFlag = 0;
pthread_mutex_lock(&vnodeList[vnode].vmutex);
pObj->state = TSDB_METER_STATE_DELETING;
if (pObj->numOfQueries > 0) {
retFlag = TSDB_CODE_ACTION_IN_PROGRESS;
dWarn("vid:%d sid:%d id:%s %d queries executing on it, wait query to be finished",
vnode, pObj->sid, pObj->meterId, pObj->numOfQueries);
if (pObj == NULL) {
return TSDB_CODE_SUCCESS;
}
pthread_mutex_unlock(&vnodeList[vnode].vmutex);
if (retFlag != 0) return retFlag;
// after remove this meter, change its stat to DELETED
if (!vnodeIsSafeToDeleteMeter(&vnodeList[vnode], sid)) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
// after remove this meter, change its state to DELETED
pObj->state = TSDB_METER_STATE_DELETED;
pObj->timeStamp = taosGetTimestampMs();
vnodeList[vnode].lastRemove = pObj->timeStamp;
vnodeRemoveStream(pObj);
pObj->meterId[0] = 0;
vnodeSaveMeterObjToFile(pObj);
vnodeFreeMeterObj(pObj);
@ -578,10 +573,19 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (pVnode->lastKeyOnFile > pVnode->cfg.daysToKeep * tsMsPerDay[pVnode->cfg.precision] + firstKey) {
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is too old to insert, key:%lld", pObj->vnode, pObj->sid,
pObj->meterId, pVnode->lastKeyOnFile, firstKey);
return TSDB_CODE_OTHERS;
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
for (i = 0; i < numOfPoints; ++i) {
// meter will be dropped, abort current insertion
if (pObj->state >= TSDB_METER_STATE_DELETING) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_INVALID_SESSION_ID;
break;
}
if (*((TSKEY *)pData) <= pObj->lastKey) {
dWarn("vid:%d sid:%d id:%s, received key:%ld not larger than lastKey:%ld", pObj->vnode, pObj->sid, pObj->meterId,
*((TSKEY *)pData), pObj->lastKey);
@ -632,9 +636,11 @@ void vnodeProcessUpdateSchemaTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pPool->vmutex);
if (pPool->commitInProcess) {
dTrace("vid:%d sid:%d mid:%s, commiting in process, commit later", pObj->vnode, pObj->sid, pObj->meterId);
if (taosTmrStart(vnodeProcessUpdateSchemaTimer, 10, pObj, vnodeTmrCtrl) == NULL)
pObj->state = TSDB_METER_STATE_READY;
dTrace("vid:%d sid:%d mid:%s, committing in process, commit later", pObj->vnode, pObj->sid, pObj->meterId);
if (taosTmrStart(vnodeProcessUpdateSchemaTimer, 10, pObj, vnodeTmrCtrl) == NULL) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
}
pthread_mutex_unlock(&pPool->vmutex);
return;
}
@ -649,41 +655,54 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
SMeterObj *pNew = (SMeterObj *)param;
if (pNew == NULL || pNew->vnode < 0 || pNew->sid < 0) return;
if (vnodeList[pNew->vnode].meterList == NULL) {
SVnodeObj* pVnode = &vnodeList[pNew->vnode];
if (pVnode->meterList == NULL) {
dTrace("vid:%d sid:%d id:%s, vnode is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew->schema);
free(pNew);
return;
}
SMeterObj *pObj = vnodeList[pNew->vnode].meterList[pNew->sid];
if (pObj == NULL) {
SMeterObj *pObj = pVnode->meterList[pNew->sid];
if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, meter is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew->schema);
free(pNew);
return;
}
pObj->state = TSDB_METER_STATE_UPDATING;
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_UPDATING);
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
return;
}
if (pObj->numOfQueries > 0) {
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
if (num > 0 || state != TSDB_METER_STATE_READY) {
dTrace("vid:%d sid:%d id:%s, update failed, retry later, numOfQueries:%d, state:%d",
pNew->vnode, pNew->sid, pNew->meterId, num, state);
// retry update meter in 50ms
if (taosTmrStart(vnodeUpdateMeter, 50, pNew, vnodeTmrCtrl) == NULL) {
dError("vid:%d sid:%d id:%s, failed to start update timer", pNew->vnode, pNew->sid, pNew->meterId);
pObj->state = TSDB_METER_STATE_READY;
dError("vid:%d sid:%d id:%s, failed to start update timer, no retry", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew->schema);
free(pNew);
}
dTrace("vid:%d sid:%d id:%s, there are ongoing queries, update later", pNew->vnode, pNew->sid, pNew->meterId);
return;
}
// commit first
if (!vnodeIsCacheCommitted(pObj)) {
// commit
// commit data first
if (taosTmrStart(vnodeProcessUpdateSchemaTimer, 0, pObj, vnodeTmrCtrl) == NULL) {
dError("vid:%d sid:%d id:%s, failed to start commit timer", pObj->vnode, pObj->sid, pObj->meterId);
pObj->state = TSDB_METER_STATE_READY;
vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
free(pNew->schema);
free(pNew);
return;
@ -691,13 +710,14 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
if (taosTmrStart(vnodeUpdateMeter, 50, pNew, vnodeTmrCtrl) == NULL) {
dError("vid:%d sid:%d id:%s, failed to start update timer", pNew->vnode, pNew->sid, pNew->meterId);
pObj->state = TSDB_METER_STATE_READY;
vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
free(pNew->schema);
free(pNew);
}
dTrace("vid:%d sid:%d meterId:%s, there are data in cache, commit first, update later",
pNew->vnode, pNew->sid, pNew->meterId);
vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
return;
}
@ -716,7 +736,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
pObj->sversion = pNew->sversion;
vnodeSaveMeterObjToFile(pObj);
pObj->state = TSDB_METER_STATE_READY;
vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
dTrace("vid:%d sid:%d id:%s, schema is updated", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew);

View File

@ -1730,6 +1730,17 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg)
bool isQueryKilled(SQuery *pQuery) {
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
/*
* check if the queried meter is going to be deleted.
* if it will be deleted soon, stop current query ASAP.
*/
SMeterObj* pMeterObj = pQInfo->pObj;
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
pQInfo->killed = 1;
return true;
}
return (pQInfo->killed == 1);
}

View File

@ -15,12 +15,13 @@
#define _DEFAULT_SOURCE
#include "vnodeShell.h"
#include <arpa/inet.h>
#include <assert.h>
#include <endian.h>
#include <stdint.h>
#include "taosmsg.h"
#include "vnode.h"
#include "vnodeShell.h"
#include "tschemautil.h"
#include "textbuffer.h"
@ -28,6 +29,7 @@
#include "vnode.h"
#include "vnodeRead.h"
#include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wint-conversion"
void * pShellServer = NULL;
@ -87,6 +89,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
// set in query processing flag
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
@ -96,7 +99,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
} else {
dError("%s is not processed", taosMsg[pMsg->msgType]);
}
return pObj;
}
@ -157,16 +160,30 @@ int vnodeOpenShellVnode(int vnode) {
return 0;
}
void vnodeCloseShellVnode(int vnode) {
taosCloseRpcChann(pShellServer, vnode);
void vnodeDelayedFreeResource(void *param, void *tmrId) {
int32_t vnode = *(int32_t*) param;
taosCloseRpcChann(pShellServer, vnode); // close connection
tfree (shellList[vnode]); //free SShellObj
tfree(param);
}
void vnodeCloseShellVnode(int vnode) {
if (shellList[vnode] == NULL) return;
for (int i = 0; i < vnodeList[vnode].cfg.maxSessions; ++i) {
vnodeFreeQInfo(shellList[vnode][i].qhandle, true);
}
tfree(shellList[vnode]);
int32_t* v = malloc(sizeof(int32_t));
*v = vnode;
/*
* free the connection related resource after 5sec, since the msg may be in
* the task queue, free it immediate will cause crash
*/
dTrace("vid:%d, delay 5sec to free resources", vnode);
taosTmrStart(vnodeDelayedFreeResource, 5000, v, vnodeTmrCtrl);
}
void vnodeCleanUpShell() {
@ -488,24 +505,38 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int sversion = htonl(pBlocks->sversion);
if (pMeterObj->state == TSDB_METER_STATE_READY) {
if (pSubmit->import)
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
int32_t state = TSDB_METER_STATE_READY;
if (pSubmit->import) {
state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
} else {
state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
}
if (state == TSDB_METER_STATE_READY) {
// meter status is ready for insert/import
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints);
else
code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints);
if (code != 0) break;
} else if (pMeterObj->state >= TSDB_METER_STATE_DELETING) {
dTrace("vid:%d sid:%d id:%s, is is removed, state:", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
break;
} else { // importing state or others
dTrace("vid:%d sid:%d id:%s, try again since in state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
}
if (code != TSDB_CODE_SUCCESS) {break;}
} else {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
break;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
}
}
numOfTotalPoints += numOfPoints;

View File

@ -85,13 +85,42 @@ int vnodeOpenVnode(int vnode) {
return 0;
}
void vnodeCloseVnode(int vnode) {
if (vnodeList == NULL) return;
static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) {
if (pVnode->meterList == NULL) {
assert(pVnode->cfg.maxSessions == 0);
return TSDB_CODE_SUCCESS;
}
bool ready = true;
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
if (!vnodeIsSafeToDeleteMeter(pVnode, sid)) {
ready = false;
} else { // set the meter is to be deleted
SMeterObj* pObj = pVnode->meterList[sid];
if (pObj != NULL) {
pObj->state = TSDB_METER_STATE_DELETED;
}
}
}
return ready? TSDB_CODE_SUCCESS:TSDB_CODE_ACTION_IN_PROGRESS;
}
int vnodeCloseVnode(int vnode) {
if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
SVnodeObj* pVnode = &vnodeList[vnode];
pthread_mutex_lock(&dmutex);
if (vnodeList[vnode].cfg.maxSessions == 0) {
if (pVnode->cfg.maxSessions == 0) {
pthread_mutex_unlock(&dmutex);
return;
return TSDB_CODE_SUCCESS;
}
// set the meter is dropped flag
if (vnodeMarkAllMetersDropped(pVnode) != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&dmutex);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
vnodeCloseStream(vnodeList + vnode);
@ -111,6 +140,7 @@ void vnodeCloseVnode(int vnode) {
vnodeCalcOpenVnodes();
pthread_mutex_unlock(&dmutex);
return TSDB_CODE_SUCCESS;
}
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) {
@ -182,25 +212,23 @@ void vnodeRemoveDataFiles(int vnode) {
dTrace("vnode %d is removed!", vnode);
}
void vnodeRemoveVnode(int vnode) {
if (vnodeList == NULL) return;
int vnodeRemoveVnode(int vnode) {
if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
if (vnodeList[vnode].cfg.maxSessions > 0) {
vnodeCloseVnode(vnode);
int32_t ret = vnodeCloseVnode(vnode);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
vnodeRemoveDataFiles(vnode);
// sprintf(cmd, "rm -rf %s/vnode%d", tsDirectory, vnode);
// if ( system(cmd) < 0 ) {
// dError("vid:%d, failed to run command %s vnode, reason:%s", vnode, cmd, strerror(errno));
// } else {
// dTrace("vid:%d, this vnode is deleted!!!", vnode);
// }
} else {
dTrace("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions);
vnodeList[vnode].cfg.maxSessions = 0;
vnodeList[vnode].cfg.maxSessions = 0; //reset value
vnodeCalcOpenVnodes();
}
return TSDB_CODE_SUCCESS;
}
int vnodeInitStore() {

View File

@ -51,8 +51,17 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
}
contLen += sizeof(SSubmitMsg);
int32_t numOfPoints = 0;
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints);
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT);
if (state == TSDB_METER_STATE_READY) {
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints);
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else {
dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid,
pObj->meterId, state);
}
assert(numOfPoints >= 0 && numOfPoints <= 1);
tfree(pTemp);
@ -76,7 +85,7 @@ void vnodeOpenStreams(void *param, void *tmrId) {
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
pObj = pVnode->meterList[sid];
if (pObj == NULL || pObj->sqlLen == 0 || pObj->status == 1 || pObj->state == TSDB_METER_STATE_DELETED) continue;
if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) continue;
dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql);

View File

@ -361,6 +361,7 @@ void vnodeUpdateFilterColumnIndex(SQuery* pQuery) {
// TODO support k<12 and k<>9
int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].data.filterOn > 0) {
pQuery->numOfFilterCols++;
@ -401,8 +402,6 @@ int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) {
pFilterInfo->fp = rangeFilterArray[2];
}
} else {
assert(lower == TSDB_RELATION_LARGE);
if (upper == TSDB_RELATION_LESS_EQUAL) {
pFilterInfo->fp = rangeFilterArray[3];
} else {
@ -421,6 +420,7 @@ int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) {
pFilterInfo->fp = filterArray[upper];
}
}
pFilterInfo->elemSize = bytes;
j++;
}
@ -470,6 +470,18 @@ bool vnodeIsProjectionQuery(SSqlFunctionExpr* pExpr, int32_t numOfOutput) {
return true;
}
/*
* the pMeter->state may be changed by vnodeIsSafeToDeleteMeter and import/update processor, the check of
* the state will not always be correct.
*
* The import/update/deleting is actually blocked by current query processing if the check of meter state is
* passed, but later queries are denied.
*
* 1. vnodeIsSafeToDelete will wait for this complete, since it also use the vmutex to check the numOfQueries
* 2. import will check the numOfQueries again after setting state to be TSDB_METER_STATE_IMPORTING, while the
* vmutex is also used.
* 3. insert has nothing to do with the query processing.
*/
int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSids, SMeterObj** pMeterObjList,
int32_t* numOfInc) {
SVnodeObj* pVnode = &vnodeList[pQueryMsg->vnode];
@ -477,21 +489,24 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
int32_t num = 0;
int32_t code = TSDB_CODE_SUCCESS;
// check all meter metadata to ensure all metadata are identical.
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
SMeterObj* pMeter = pVnode->meterList[pSids[i]->sid];
if (pMeter == NULL || pMeter->state != TSDB_METER_STATE_READY) {
if (pMeter == NULL) {
if (pMeter == NULL || (pMeter->state > TSDB_METER_STATE_INSERT)) {
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DELETING)) {
code = TSDB_CODE_NOT_ACTIVE_SESSION;
dError("qmsg:%p, vid:%d sid:%d, not there", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
} else {
} else {//update or import
code = TSDB_CODE_ACTION_IN_PROGRESS;
dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%d, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid,
pMeter->meterId, pMeter->state);
}
} else {
/*
* vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can
* check if the numOfQueries is 0 or not.
*/
pMeterObjList[(*numOfInc)++] = pMeter;
__sync_fetch_and_add(&pMeter->numOfQueries, 1);
@ -517,7 +532,6 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList,
SMeterObj* pMeter = pMeterObjList[i];
if (pMeter != NULL) { // here, do not need to lock to perform operations
assert(pMeter->state != TSDB_METER_STATE_DELETING && pMeter->state != TSDB_METER_STATE_DELETED);
__sync_fetch_and_sub(&pMeter->numOfQueries, 1);
if (pMeter->numOfQueries > 0) {
@ -571,3 +585,66 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) {
}
}
}
int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state) {
return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state);
}
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state) {
pMeterObj->state &= (~state);
}
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state) {
if (state == TSDB_METER_STATE_READY) {
return pMeterObj->state == TSDB_METER_STATE_READY;
} else if (state == TSDB_METER_STATE_DELETING) {
return pMeterObj->state >= state;
} else {
return (((pMeterObj->state) & state) == state);
}
}
void vnodeSetMeterDeleting(SMeterObj* pMeterObj) {
if (pMeterObj == NULL) {
return;
}
pMeterObj->state |= TSDB_METER_STATE_DELETING;
}
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) {
SMeterObj* pObj = pVnode->meterList[sid];
if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETED)) {
return true;
}
int32_t prev = vnodeTransferMeterState(pObj, TSDB_METER_STATE_DELETING);
/*
* if the meter is not in ready/deleting state, it must be in insert/import/update,
* set the deleting state and wait the procedure to be completed
*/
if (prev != TSDB_METER_STATE_READY && prev < TSDB_METER_STATE_DELETING) {
vnodeSetMeterDeleting(pObj);
dWarn("vid:%d sid:%d id:%s, can not be deleted, state:%d, wait", pObj->vnode, pObj->sid, pObj->meterId, prev);
return false;
}
bool ready = true;
/*
* the query will be stopped ASAP, since the state of meter is set to TSDB_METER_STATE_DELETING,
* and new query will abort since the meter is deleted.
*/
pthread_mutex_lock(&pVnode->vmutex);
if (pObj->numOfQueries > 0) {
dWarn("vid:%d sid:%d id:%s %d queries executing on it, wait query to be finished",
pObj->vnode, pObj->sid, pObj->meterId, pObj->numOfQueries);
ready = false;
}
pthread_mutex_unlock(&pVnode->vmutex);
return ready;
}

View File

@ -1532,7 +1532,7 @@ void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32
}
////////////////////////////////////////////////////////////////////////////////////////////
void tColModelCompress(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) {
void tColModelCompact(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) {
if (inputBuffer->numOfElems == 0 || maxElemsCapacity == inputBuffer->numOfElems) {
return;
}

View File

@ -117,7 +117,7 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p
}
}
bool taosHasNoneInterpoPoints(SInterpolationInfo* pInterpoInfo) { return taosNumOfRemainPoints(pInterpoInfo) > 0; }
bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) { return taosNumOfRemainPoints(pInterpoInfo) > 0; }
int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) {
if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) {