Merge remote-tracking branch 'origin/develop' into feature/wal

This commit is contained in:
Shengliang Guan 2020-12-28 14:06:01 +08:00
commit 0c28b5ce6c
25 changed files with 134 additions and 361 deletions

1
Jenkinsfile vendored
View File

@ -44,6 +44,7 @@ def pre_test(){
git pull git pull
git fetch git fetch
git checkout ${CHANGE_BRANCH} git checkout ${CHANGE_BRANCH}
git pull
git merge develop git merge develop
cd ${WK} cd ${WK}
git reset --hard git reset --hard

View File

@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "2.0.11.0") SET(TD_VER_NUMBER "2.0.12.0")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)

View File

@ -1,6 +1,6 @@
name: tdengine name: tdengine
base: core18 base: core18
version: '2.0.11.0' version: '2.0.12.0'
icon: snap/gui/t-dengine.svg icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT. summary: an open-source big data platform designed and optimized for IoT.
description: | description: |
@ -72,7 +72,7 @@ parts:
- usr/bin/taosd - usr/bin/taosd
- usr/bin/taos - usr/bin/taos
- usr/bin/taosdemo - usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.11.0 - usr/lib/libtaos.so.2.0.12.0
- usr/lib/libtaos.so.1 - usr/lib/libtaos.so.1
- usr/lib/libtaos.so - usr/lib/libtaos.so

View File

@ -38,12 +38,6 @@ typedef struct SLocalDataSource {
tFilePage filePage; tFilePage filePage;
} SLocalDataSource; } SLocalDataSource;
enum {
TSC_LOCALREDUCE_READY = 0x0,
TSC_LOCALREDUCE_IN_PROGRESS = 0x1,
TSC_LOCALREDUCE_TOBE_FREED = 0x2,
};
typedef struct SLocalReducer { typedef struct SLocalReducer {
SLocalDataSource ** pLocalDataSrc; SLocalDataSource ** pLocalDataSrc;
int32_t numOfBuffer; int32_t numOfBuffer;
@ -56,7 +50,6 @@ typedef struct SLocalReducer {
tFilePage * pTempBuffer; tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx; struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result. int32_t rowSize; // size of each intermediate result.
int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released bool hasPrevRow; // cannot be released
bool hasUnprocessedRow; bool hasUnprocessedRow;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;

View File

@ -69,9 +69,10 @@ typedef struct STableMeta {
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
char sTableId[TSDB_TABLE_FNAME_LEN]; char sTableId[TSDB_TABLE_FNAME_LEN];
SVgroupInfo vgroupInfo; int32_t vgId;
SCorVgroupInfo corVgroupInfo; SCorVgroupInfo corVgroupInfo;
STableId id; STableId id;
// union {int64_t stableUid; SSchema* schema;};
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta; } STableMeta;
@ -307,6 +308,7 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet; SRpcCorEpSet *tscCorMgmtEpSet;
void* pDnodeConn; void* pDnodeConn;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
@ -477,14 +479,14 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
} }
} }
extern SCacheObj* tscMetaCache; extern SCacheObj *tscMetaCache;
extern int tscObjRef;
extern void * tscTmr; extern int tscObjRef;
extern void * tscQhandle; extern void *tscTmr;
extern int tscKeepConn[]; extern void *tscQhandle;
extern int tscNumOfThreads; extern int tscKeepConn[];
extern int tscRefId; extern int tscRefId;
extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);

View File

@ -2625,7 +2625,6 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems);
return true; return true;
} }

View File

@ -93,7 +93,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
// for top/bottom function, the output of timestamp is the first column // for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->functionId; int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf; pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf;
pCtx->param[2].i64Key = pQueryInfo->order.order; pCtx->param[2].i64Key = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
@ -493,13 +493,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
// there is no more result, so we release all allocated resource // there is no more result, so we release all allocated resource
SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL); SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL);
if (pLocalReducer != NULL) { if (pLocalReducer != NULL) {
int32_t status = 0;
while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY,
TSC_LOCALREDUCE_TOBE_FREED)) == TSC_LOCALREDUCE_IN_PROGRESS) {
taosMsleep(100);
tscDebug("%p waiting for delete procedure, status: %d", pSql, status);
}
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo); pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) { if (pLocalReducer->pCtx != NULL) {
@ -1303,6 +1296,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// re
for (int32_t i = 0; i < t; ++i) { for (int32_t i = 0; i < t; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity; pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity;
if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM || pExpr->functionId == TSDB_FUNC_DIFF) {
pLocalReducer->pCtx[i].ptsOutputBuf = pLocalReducer->pCtx[0].aOutputBuf;
}
} }
memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage)); memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
@ -1437,24 +1434,13 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
SLocalReducer *pLocalReducer = pRes->pLocalReducer; SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage *tmpBuffer = pLocalReducer->pTempBuffer;
// set the data merge in progress
int32_t prevStatus =
atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
if (prevStatus != TSC_LOCALREDUCE_READY) {
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already
return TSDB_CODE_SUCCESS;
}
tFilePage *tmpBuffer = pLocalReducer->pTempBuffer;
if (doHandleLastRemainData(pSql)) { if (doHandleLastRemainData(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (doBuildFilledResultForGroup(pSql)) { if (doBuildFilledResultForGroup(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1510,7 +1496,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
pLocalReducer->discardData->num = 0; pLocalReducer->discardData->num = 0;
if (saveGroupResultInfo(pSql)) { if (saveGroupResultInfo(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1556,7 +1541,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
// here we do not check the return value // here we do not check the return value
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree); adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS);
if (pRes->numOfRows == 0) { if (pRes->numOfRows == 0) {
handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer); handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
@ -1567,7 +1551,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* If previous group is not skipped, keep it in pRes->numOfGroups * If previous group is not skipped, keep it in pRes->numOfGroups
*/ */
if (notSkipped && saveGroupResultInfo(pSql)) { if (notSkipped && saveGroupResultInfo(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1587,7 +1570,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
if (pRes->numOfRows == 0) { if (pRes->numOfRows == 0) {
continue; continue;
} else { } else {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { // result buffer is not full } else { // result buffer is not full
@ -1612,9 +1594,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
genFinalResults(pSql, pLocalReducer, true); genFinalResults(pSql, pLocalReducer, true);
} }
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0);
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -731,7 +731,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI
return code; return code;
} }
dataBuf->vgId = pTableMeta->vgroupInfo.vgId; dataBuf->vgId = pTableMeta->vgId;
dataBuf->numOfTables = 1; dataBuf->numOfTables = 1;
*totalNum += numOfRows; *totalNum += numOfRows;

View File

@ -666,6 +666,7 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
const char* msg1 = "invalid query expression"; const char* msg1 = "invalid query expression";
const char* msg2 = "interval cannot be less than 10 ms"; const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "sliding cannot be used without interval"; const char* msg3 = "sliding cannot be used without interval";
const char* msg4 = "top/bottom query does not support order by value in interval query";
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
@ -712,6 +713,11 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
int32_t colId = pQueryInfo->order.orderColId;
if (pQueryInfo->interval.interval > 0 && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -4646,7 +4652,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
if (!(orderByTags || orderByTS) && !isTopBottomQuery(pQueryInfo)) { if (!(orderByTags || orderByTS) && !isTopBottomQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} else { } else { // order by top/bottom result value column is not supported in case of interval query.
assert(!(orderByTags && orderByTS)); assert(!(orderByTags && orderByTS));
} }
@ -4936,7 +4942,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); pUpdateMsg->head.vgId = htonl(pTableMeta->vgId);
pUpdateMsg->tid = htonl(pTableMeta->id.tid); pUpdateMsg->tid = htonl(pTableMeta->id.tid);
pUpdateMsg->uid = htobe64(pTableMeta->id.uid); pUpdateMsg->uid = htobe64(pTableMeta->id.uid);
pUpdateMsg->colId = htons(pTagsSchema->colId); pUpdateMsg->colId = htons(pTagsSchema->colId);

View File

@ -130,13 +130,14 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
return NULL; return NULL;
} }
static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupInfo *vgroupInfo) { static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupMsg *pVgroupMsg) {
corVgroupInfo->version = 0; corVgroupInfo->version = 0;
corVgroupInfo->inUse = 0; corVgroupInfo->inUse = 0;
corVgroupInfo->numOfEps = vgroupInfo->numOfEps; corVgroupInfo->numOfEps = pVgroupMsg->numOfEps;
for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) {
corVgroupInfo->epAddr[i].fqdn = strdup(vgroupInfo->epAddr[i].fqdn); for (int32_t i = 0; i < pVgroupMsg->numOfEps; i++) {
corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; corVgroupInfo->epAddr[i].fqdn = strndup(pVgroupMsg->epAddr[i].fqdn, tListLen(pVgroupMsg->epAddr[0].fqdn));
corVgroupInfo->epAddr[i].port = pVgroupMsg->epAddr[i].port;
} }
} }
@ -145,8 +146,10 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema); int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize); STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
pTableMeta->tableType = pTableMetaMsg->tableType; pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->vgId = pTableMetaMsg->vgroup.vgId;
pTableMeta->tableInfo = (STableComInfo) { pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags, .numOfTags = pTableMetaMsg->numOfTags,
.precision = pTableMetaMsg->precision, .precision = pTableMetaMsg->precision,
@ -156,18 +159,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->id.tid = pTableMetaMsg->tid; pTableMeta->id.tid = pTableMetaMsg->tid;
pTableMeta->id.uid = pTableMetaMsg->uid; pTableMeta->id.uid = pTableMetaMsg->uid;
SVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo; tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMetaMsg->vgroup);
pVgroupInfo->numOfEps = pTableMetaMsg->vgroup.numOfEps;
pVgroupInfo->vgId = pTableMetaMsg->vgroup.vgId;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
SEpAddrMsg* pEpMsg = &pTableMetaMsg->vgroup.epAddr[i];
pVgroupInfo->epAddr[i].fqdn = strndup(pEpMsg->fqdn, tListLen(pEpMsg->fqdn));
pVgroupInfo->epAddr[i].port = pEpMsg->port;
}
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, pVgroupInfo);
pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion; pTableMeta->tversion = pTableMetaMsg->tversion;

View File

@ -45,32 +45,30 @@ static int32_t getWaitingTimeInterval(int32_t count) {
return 0; return 0;
} }
return initial * (2<<(count - 2)); return initial * ((2u)<<(count - 2));
} }
static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) { static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
SRpcEpSet* pEpSet = &pSql->epSet;
// Issue the query to one of the vnode among a vgroup randomly. // Issue the query to one of the vnode among a vgroup randomly.
// change the inUse property would not affect the isUse attribute of STableMeta // change the inUse property would not affect the isUse attribute of STableMeta
pEpSet->inUse = rand() % pVgroupInfo->numOfEps; pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
// apply the FQDN string length check here // apply the FQDN string length check here
bool hasFqdn = false; bool existed = false;
pEpSet->numOfEps = pVgroupInfo->numOfEps; pEpSet->numOfEps = pVgroupInfo->numOfEps;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
pEpSet->port[i] = pVgroupInfo->epAddr[i].port; pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
if (!hasFqdn) { int32_t len = (int32_t) strnlen(pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
hasFqdn = (strlen(pEpSet->fqdn[i]) > 0); if (len > 0) {
tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
existed = true;
} }
} }
assert(existed);
assert(hasFqdn);
} }
static void tscDumpMgmtEpSet(SSqlObj *pSql) { static void tscDumpMgmtEpSet(SSqlObj *pSql) {
@ -102,7 +100,8 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
pCorEpSet->epSet = *pEpSet; pCorEpSet->epSet = *pEpSet;
taosCorEndWrite(&pCorEpSet->version); taosCorEndWrite(&pCorEpSet->version);
} }
static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SCorVgroupInfo *pVgroupInfo) {
if (pVgroupInfo == NULL) { return;} if (pVgroupInfo == NULL) { return;}
taosCorBeginRead(&pVgroupInfo->version); taosCorBeginRead(&pVgroupInfo->version);
int8_t inUse = pVgroupInfo->inUse; int8_t inUse = pVgroupInfo->inUse;
@ -515,8 +514,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
} else { } else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId);
} }
pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
@ -535,7 +534,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// NOTE: shell message size should not include SMsgDesc // NOTE: shell message size should not include SMsgDesc
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
int32_t vgId = pTableMeta->vgroupInfo.vgId;
SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
pMsgDesc->numOfVnodes = htonl(1); // always one vnode pMsgDesc->numOfVnodes = htonl(1); // always one vnode
@ -543,7 +541,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SMsgDesc); pMsg += sizeof(SMsgDesc);
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
pShellMsg->header.vgId = htonl(vgId); pShellMsg->header.vgId = htonl(pTableMeta->vgId);
pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc
pShellMsg->length = pShellMsg->header.contLen; pShellMsg->length = pShellMsg->header.contLen;
@ -551,9 +549,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo);
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit,
pSql->epSet.numOfEps); pSql->epSet.numOfEps);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -597,24 +595,28 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
SVgroupInfo* pVgroupInfo = NULL; int32_t vgId = -1;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
assert(index >= 0); assert(index >= 0);
SVgroupInfo* pVgroupInfo = NULL;
if (pTableMetaInfo->vgroupList->numOfVgroups > 0) { if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
assert(index < pTableMetaInfo->vgroupList->numOfVgroups); assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
} }
vgId = pVgroupInfo->vgId;
tscSetDnodeEpSet(&pSql->epSet, pVgroupInfo);
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups); tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
} else { } else {
pVgroupInfo = &pTableMeta->vgroupInfo; vgId = pTableMeta->vgId;
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo);
} }
assert(pVgroupInfo != NULL); pSql->epSet.inUse = rand()%pSql->epSet.numOfEps;
tscSetDnodeEpSet(pSql, pVgroupInfo); pQueryMsg->head.vgId = htonl(vgId);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->id.tid); pTableIdInfo->tid = htonl(pTableMeta->id.tid);
@ -633,7 +635,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info // set the vgroup info
tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); tscSetDnodeEpSet(&pSql->epSet, &pTableIdList->vgInfo);
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList); int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
@ -1448,48 +1450,11 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMetaInfo->pTableMeta->corVgroupInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
// pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
//
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// int32_t vgIndex = pTableMetaInfo->vgroupIndex;
// if (pTableMetaInfo->pVgroupTables == NULL) {
// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
//
// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
// } else {
// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
// assert(vgIndex >= 0 && vgIndex < numOfVgroups);
//
// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
//
// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
// }
// } else {
// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
// }
//
// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
//
// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
// return TSDB_CODE_SUCCESS;
//}
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SAlterDbMsg); pCmd->payloadLen = sizeof(SAlterDbMsg);

View File

@ -31,17 +31,16 @@
#include "tlocale.h" #include "tlocale.h"
// global, not configurable // global, not configurable
SCacheObj* tscMetaCache; SCacheObj *tscMetaCache; // table meta cache
SHashObj *tscHashMap; // hash map to keep the global vgroup info
int tscObjRef = -1; int tscObjRef = -1;
void * tscTmr; void *tscTmr;
void * tscQhandle; void *tscQhandle;
void * tscCheckDiskUsageTmr; void *tscCheckDiskUsageTmr;
int tscRefId = -1; int tscRefId = -1;
int tscNumOfObj = 0; // number of sqlObj in current process.
int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();
@ -114,7 +113,7 @@ void taos_init_imp(void) {
int queueSize = tsMaxConnections*2; int queueSize = tsMaxConnections*2;
double factor = (tscEmbedded == 0)? 2.0:4.0; double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); int32_t tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) { if (tscNumOfThreads < 2) {
tscNumOfThreads = 2; tscNumOfThreads = 2;
} }
@ -133,7 +132,8 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) { if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
tscHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
} }
tscRefId = taosOpenRef(200, tscCloseTscObj); tscRefId = taosOpenRef(200, tscCloseTscObj);

View File

@ -458,21 +458,19 @@ void tscFreeRegisteredSqlObj(void *pSql) {
SSqlObj* p = *(SSqlObj**)pSql; SSqlObj* p = *(SSqlObj**)pSql;
STscObj* pTscObj = p->pTscObj; STscObj* pTscObj = p->pTscObj;
assert(p->self != 0); assert(RID_VALID(p->self));
tscFreeSqlObj(p); tscFreeSqlObj(p);
taosReleaseRef(tscRefId, pTscObj->rid); taosReleaseRef(tscRefId, pTscObj->rid);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1);
int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1);
tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total);
} }
void tscFreeTableMetaHelper(void *pTableMeta) { void tscFreeTableMetaHelper(void *pTableMeta) {
STableMeta* p = (STableMeta*) pTableMeta; STableMeta* p = (STableMeta*) pTableMeta;
int32_t numOfEps = p->vgroupInfo.numOfEps;
assert(numOfEps >= 0 && numOfEps <= TSDB_MAX_REPLICA);
for(int32_t i = 0; i < numOfEps; ++i) {
tfree(p->vgroupInfo.epAddr[i].fqdn);
}
int32_t numOfEps1 = p->corVgroupInfo.numOfEps; int32_t numOfEps1 = p->corVgroupInfo.numOfEps;
assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA); assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA);
@ -1912,6 +1910,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
void registerSqlObj(SSqlObj* pSql) { void registerSqlObj(SSqlObj* pSql) {
taosAcquireRef(tscRefId, pSql->pTscObj->rid); taosAcquireRef(tscRefId, pSql->pTscObj->rid);
pSql->self = taosAddRef(tscObjRef, pSql); pSql->self = taosAddRef(tscObjRef, pSql);
int32_t num = atomic_add_fetch_32(&pSql->pTscObj->numOfObj, 1);
int32_t total = atomic_add_fetch_32(&tscNumOfObj, 1);
tscDebug("%p new SqlObj from %p, total in tscObj:%d, total:%d", pSql, pSql->pTscObj, num, total);
} }
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
@ -1941,30 +1943,24 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
return NULL; return NULL;
} }
pNew->fp = fp; pNew->fp = fp;
pNew->fetchFp = fp; pNew->fetchFp = fp;
pNew->param = param; pNew->param = param;
pNew->sqlstr = NULL;
pNew->maxRetry = TSDB_MAX_REPLICA; pNew->maxRetry = TSDB_MAX_REPLICA;
pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed", pSql);
tscFreeSqlObj(pNew);
return NULL;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, 0);
assert(pSql->cmd.clauseIndex == 0); assert(pSql->cmd.clauseIndex == 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
registerSqlObj(pNew); registerSqlObj(pNew);
return pNew; return pNew;
} }
static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* pNewQueryInfo, int64_t uid) { static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t uid) {
int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo); int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo);
if (numOfOutput == 0) { if (numOfOutput == 0) {
return; return;
@ -2017,15 +2013,9 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
pNew->sqlstr = NULL;
pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
SSqlCmd* pnCmd = &pNew->cmd; SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
@ -2113,23 +2103,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
goto _error; goto _error;
} }
doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid); doSetSqlExprAndResultFieldInfo(pNewQueryInfo, uid);
pNew->fp = fp; pNew->fp = fp;
pNew->fetchFp = fp; pNew->fetchFp = fp;
pNew->param = param;
pNew->param = param;
pNew->maxRetry = TSDB_MAX_REPLICA; pNew->maxRetry = TSDB_MAX_REPLICA;
char* name = pTableMetaInfo->name; char* name = pTableMetaInfo->name;
STableMetaInfo* pFinalInfo = NULL; STableMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) { if (pPrevSql == NULL) { // get by name may failed due to the cache cleanup
STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta);
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList,
pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables);
} else { // transfer the ownership of pTableMeta to the newly create sql object. } else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);

@ -1 +1 @@
Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944 Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df

View File

@ -34,17 +34,13 @@ int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num);
void clearClosedResultRows(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultRowInfo);
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order);
int32_t initResultRow(SResultRow *pResultRow); int32_t initResultRow(SResultRow *pResultRow);
void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);

View File

@ -753,11 +753,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
} }
static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) { static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) {
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { if ((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo); closeAllResultRows(pResultRowInfo);
pResultRowInfo->curIndex = pResultRowInfo->size - 1; pResultRowInfo->curIndex = pResultRowInfo->size - 1;
} else { } else {
doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey, ascQuery); int32_t step = ascQuery? 1:-1;
doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery);
} }
} }
@ -1198,8 +1199,12 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// prev time window not interpolation yet. // prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo); int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) { if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) {
for(int32_t j = prevIndex; j < curIndex; ++j) { for(int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow *pRes = pWindowResInfo->pResult[j]; SResultRow *pRes = pWindowResInfo->pResult[j];
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
STimeWindow w = pRes->win; STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId); ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId);
@ -1600,6 +1605,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if (prevWindowIndex != -1 && prevWindowIndex < curIndex) { if (prevWindowIndex != -1 && prevWindowIndex < curIndex) {
for (int32_t k = prevWindowIndex; k < curIndex; ++k) { for (int32_t k = prevWindowIndex; k < curIndex; ++k) {
SResultRow *pRes = pWindowResInfo->pResult[k]; SResultRow *pRes = pWindowResInfo->pResult[k];
if (pRes->closed) {
assert(resultRowInterpolated(pResult, RESULT_ROW_START_INTERP) && resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
continue;
}
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId); ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
@ -1713,10 +1722,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
} }
// update the lastkey of current table
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied // interval query with limit applied
int32_t numOfRes = 0; int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
@ -5181,10 +5186,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
scanMultiTableDataBlocks(pQInfo); scanMultiTableDataBlocks(pQInfo);
pQInfo->groupIndex += 1; pQInfo->groupIndex += 1;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; taosArrayDestroy(s);
// no results generated for current group, continue to try the next group // no results generated for current group, continue to try the next group
taosArrayDestroy(s); SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo->size <= 0) { if (pWindowResInfo->size <= 0) {
continue; continue;
} }
@ -5211,8 +5216,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo->groupIndex = currentGroupIndex; // restore the group index pQInfo->groupIndex = currentGroupIndex; // restore the group index
assert(pQuery->rec.rows == pWindowResInfo->size); assert(pQuery->rec.rows == pWindowResInfo->size);
resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
clearClosedResultRows(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
break; break;
} }
} else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTSCompQuery(pQuery)) { } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTSCompQuery(pQuery)) {

View File

@ -313,7 +313,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
// allocate buf // allocate buf
if (availablePage == NULL) { if (availablePage == NULL) {
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES + 2); // add extract bytes in case of zipped buffer increased.
} else { } else {
pi->pData = availablePage; pi->pData = availablePage;
} }

View File

@ -20,18 +20,6 @@
#include "qExecutor.h" #include "qExecutor.h"
#include "qUtil.h" #include "qUtil.h"
static int32_t getResultRowKeyInfo(SResultRow* pResult, int16_t type, char** key, int16_t* bytes) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
*key = varDataVal(pResult->key);
*bytes = varDataLen(pResult->key);
} else {
*key = (char*) &pResult->win.skey;
*bytes = tDataTypeDesc[type].nSize;
}
return 0;
}
int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0; int32_t size = 0;
@ -99,73 +87,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
} }
void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num) {
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) {
return;
}
int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo);
assert(num >= 0 && num <= numOfClosed);
int16_t type = pResultRowInfo->type;
int64_t uid = 0;
char *key = NULL;
int16_t bytes = -1;
for (int32_t i = 0; i < num; ++i) {
SResultRow *pResult = pResultRowInfo->pResult[i];
if (pResult->closed) { // remove the window slot from hash table
getResultRowKeyInfo(pResult, type, &key, &bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
} else {
break;
}
}
int32_t remain = pResultRowInfo->size - num;
// clear all the closed windows from the window list
for (int32_t k = 0; k < remain; ++k) {
copyResultRow(pRuntimeEnv, pResultRowInfo->pResult[k], pResultRowInfo->pResult[num + k], type);
}
// move the unclosed window in the front of the window list
for (int32_t k = remain; k < pResultRowInfo->size; ++k) {
SResultRow *pWindowRes = pResultRowInfo->pResult[k];
clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type);
}
pResultRowInfo->size = remain;
for (int32_t k = 0; k < pResultRowInfo->size; ++k) {
SResultRow *pResult = pResultRowInfo->pResult[k];
getResultRowKeyInfo(pResult, type, &key, &bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
assert(p != NULL);
int32_t v = (*p - num);
assert(v >= 0 && v <= pResultRowInfo->size);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t));
}
pResultRowInfo->curIndex = -1;
}
void clearClosedResultRows(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) {
return;
}
int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo);
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, numOfClosed);
}
int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) { int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {
int32_t i = 0; int32_t i = 0;
while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
@ -188,40 +109,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
} }
} }
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/
void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
if (pResultRowInfo->size <= 1) {
return;
}
// get the result order
int32_t resultOrder = (pResultRowInfo->pResult[0]->win.skey < pResultRowInfo->pResult[1]->win.skey)? 1:-1;
if (order != resultOrder) {
return;
}
int32_t i = 0;
if (order == QUERY_ASC_FORWARD_STEP) {
TSKEY ekey = pResultRowInfo->pResult[i]->win.ekey;
while (i < pResultRowInfo->size && (ekey < lastKey)) {
++i;
}
} else if (order == QUERY_DESC_FORWARD_STEP) {
while (i < pResultRowInfo->size && (pResultRowInfo->pResult[i]->win.skey > lastKey)) {
++i;
}
}
if (i < pResultRowInfo->size) {
pResultRowInfo->size = (i + 1);
}
}
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) {
return (getResultRow(pResultRowInfo, slot)->closed == true); return (getResultRow(pResultRowInfo, slot)->closed == true);
} }
@ -262,47 +149,6 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
} }
} }
/**
* The source window result pos attribution of the source window result does not assign to the destination,
* since the attribute of "Pos" is bound to each window result when the window result is created in the
* disk-based result buffer.
*/
void copyResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *dst, const SResultRow *src, int16_t type) {
dst->numOfRows = src->numOfRows;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
dst->key = realloc(dst->key, varDataTLen(src->key));
varDataCopy(dst->key, src->key);
} else {
dst->win = src->win;
}
dst->closed = src->closed;
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput;
for (int32_t i = 0; i < nOutputCols; ++i) {
SResultRowCellInfo *pDst = getResultCell(pRuntimeEnv, dst, i);
SResultRowCellInfo *pSrc = getResultCell(pRuntimeEnv, src, i);
// char *buf = pDst->interResultBuf;
memcpy(pDst, pSrc, sizeof(SResultRowCellInfo) + pRuntimeEnv->pCtx[i].interBufBytes);
// pDst->interResultBuf = buf; // restore the allocated buffer
// copy the result info struct
// memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes);
// copy the output buffer data from src to dst, the position info keep unchanged
tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId);
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage);
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SResultRow *)src, srcpage);
size_t s = pRuntimeEnv->pQuery->pExpr1[i].bytes;
memcpy(dstBuf, srcBuf, s);
}
}
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) { SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) {
assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput); assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput);
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]);

View File

@ -32,17 +32,18 @@ typedef void (*_hash_free_fn_t)(void *param);
typedef struct SHashNode { typedef struct SHashNode {
struct SHashNode *next; struct SHashNode *next;
uint32_t hashVal; // the hash value of key uint32_t hashVal; // the hash value of key
uint32_t keyLen; // length of the key uint32_t dataLen; // length of data
size_t dataLen; // length of data uint32_t keyLen; // length of the key
int8_t count; // reference count int8_t removed; // flag to indicate removed
int8_t removed; // flag to indicate removed int8_t count; // reference count
char data[]; char data[];
} SHashNode; } SHashNode;
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen) #define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) #define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((char*)(_n) - sizeof(SHashNode)); #define GET_HASH_PNODE(_n) ((char*)(_n) - sizeof(SHashNode));
typedef enum SHashLockTypeE { typedef enum SHashLockTypeE {
HASH_NO_LOCK = 0, HASH_NO_LOCK = 0,
HASH_ENTRY_LOCK = 1, HASH_ENTRY_LOCK = 1,
@ -115,7 +116,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
* @param dsize * @param dsize
* @return * @return
*/ */
void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize); void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize);
/** /**
* remove item with the specified key * remove item with the specified key

View File

@ -271,10 +271,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
} }
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetCB(pHashObj, key, keyLen, NULL, NULL, 0); return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL, 0);
} }
void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) { void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) {
if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) { if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) {
return NULL; return NULL;
} }
@ -654,7 +654,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
pNewNode->keyLen = (uint32_t)keyLen; pNewNode->keyLen = (uint32_t)keyLen;
pNewNode->hashVal = hashVal; pNewNode->hashVal = hashVal;
pNewNode->dataLen = dsize; pNewNode->dataLen = (uint32_t) dsize;
pNewNode->count = 1; pNewNode->count = 1;
memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);

View File

@ -278,7 +278,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
} }
SCacheDataNode* ptNode = NULL; SCacheDataNode* ptNode = NULL;
taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*)); taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*));
void* pData = (ptNode != NULL)? ptNode->data:NULL; void* pData = (ptNode != NULL)? ptNode->data:NULL;

View File

@ -91,7 +91,7 @@ static void vnodeIncRef(void *ptNode) {
void *vnodeAcquire(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = NULL; SVnodeObj **ppVnode = NULL;
if (tsVnodesHash != NULL) { if (tsVnodesHash != NULL) {
ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); ppVnode = taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
} }
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {

View File

@ -38,7 +38,7 @@ class RestfulInsert:
for i in range(loop): for i in range(loop):
tableID = threadID * tablesPerThread tableID = threadID * tablesPerThread
if tableID + i >= self.numOfTables : break if tableID + i >= self.numOfTables : break
name = 'beijing' if tableID % 2 == 0 else 'shanghai' name = 'beijing' if (tableID + i) % 2 == 0 else 'shanghai'
data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name) data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
response = requests.post(self.url, data, headers = self.header) response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200: if response.status_code != 200:

View File

@ -230,7 +230,7 @@ python3 test.py -f tools/lowaTest.py
python3 test.py -f tools/taosdemoTest2.py python3 test.py -f tools/taosdemoTest2.py
# subscribe # subscribe
python3 test.py -f subscribe/singlemeter.py #python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py #python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py #python3 test.py -f subscribe/supertable.py

View File

@ -36,7 +36,7 @@
./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim ./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim ./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim
#./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim ./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim ./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim ./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim ./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim