Merge pull request #5759 from taosdata/master
Merge from master into develop
This commit is contained in:
commit
60e6729d7a
|
@ -328,6 +328,7 @@ typedef struct {
|
|||
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
|
||||
SColumnIndex* pColumnIndex;
|
||||
|
||||
TAOS_FIELD* final;
|
||||
SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions
|
||||
struct SLocalMerger *pLocalMerger;
|
||||
} SSqlRes;
|
||||
|
|
|
@ -937,6 +937,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
|
|||
return ret;
|
||||
}
|
||||
|
||||
if (sql == NULL) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
|
||||
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
|
||||
return code;
|
||||
|
@ -945,6 +949,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
|
|||
} else {
|
||||
sql = sToken.z;
|
||||
|
||||
if (sql == NULL) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
|
||||
if (pCmd->curSql == NULL) {
|
||||
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
|
||||
|
@ -952,10 +960,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
|
|||
}
|
||||
|
||||
*sqlstr = sql;
|
||||
|
||||
if (*sqlstr == NULL) {
|
||||
code = TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -144,8 +144,9 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
|
|||
SNewVgroupInfo info = {0};
|
||||
info.numOfEps = pVgroupMsg->numOfEps;
|
||||
info.vgId = pVgroupMsg->vgId;
|
||||
info.inUse = 0;
|
||||
info.inUse = 0; // 0 is the default value of inUse in case of multiple replica
|
||||
|
||||
assert(info.numOfEps >= 1 && info.vgId >= 1);
|
||||
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
|
||||
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||
info.ep[i].port = pVgroupMsg->epAddr[i].port;
|
||||
|
|
|
@ -34,6 +34,7 @@ int tscKeepConn[TSDB_SQL_MAX] = {0};
|
|||
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
|
||||
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
|
||||
void tscSaveSubscriptionProgress(void* sub);
|
||||
static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo);
|
||||
|
||||
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
|
||||
static int32_t getWaitingTimeInterval(int32_t count) {
|
||||
|
@ -78,7 +79,8 @@ static void tscEpSetHtons(SRpcEpSet *s) {
|
|||
for (int32_t i = 0; i < s->numOfEps; i++) {
|
||||
s->port[i] = htons(s->port[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
|
||||
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
|
||||
return false;
|
||||
|
@ -111,19 +113,22 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou
|
|||
}
|
||||
}
|
||||
|
||||
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
||||
SSqlCmd *pCmd = &pObj->cmd;
|
||||
static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t vgId = pTableMetaInfo->pTableMeta->vgId;
|
||||
int32_t vgId = -1;
|
||||
if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
assert(vgId == 0);
|
||||
return;
|
||||
vgId = extractSTableQueryVgroupId(pTableMetaInfo);
|
||||
} else {
|
||||
vgId = pTableMetaInfo->pTableMeta->vgId;
|
||||
}
|
||||
|
||||
assert(vgId > 0);
|
||||
|
||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
|
||||
|
@ -138,6 +143,33 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
|||
|
||||
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
|
||||
// Update the local cached epSet info cached by SqlObj
|
||||
int32_t inUse = pSql->epSet.inUse;
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||
tscDebug("%p update the epSet in SqlObj, in use before:%d, after:%d", pSql, inUse, pSql->epSet.inUse);
|
||||
|
||||
}
|
||||
|
||||
int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) {
|
||||
assert(pTableMetaInfo != NULL);
|
||||
|
||||
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
|
||||
int32_t vgId = -1;
|
||||
|
||||
if (pTableMetaInfo->pVgroupTables == NULL) {
|
||||
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
|
||||
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
|
||||
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
|
||||
} else {
|
||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
|
||||
|
||||
SVgroupTableInfo *pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
|
||||
vgId = pTableIdList->vgInfo.vgId;
|
||||
}
|
||||
|
||||
return vgId;
|
||||
}
|
||||
|
||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||
|
@ -515,21 +547,22 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
|
||||
int32_t vgId = -1;
|
||||
|
||||
if (pTableMetaInfo->pVgroupTables == NULL) {
|
||||
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
|
||||
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qId);
|
||||
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
|
||||
} else {
|
||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
|
||||
|
||||
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qId);
|
||||
vgId = pTableIdList->vgInfo.vgId;
|
||||
}
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, vgId, vgIndex, pSql->res.qId);
|
||||
} else {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
|
||||
|
@ -1980,7 +2013,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
||||
tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2132,18 +2165,33 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
|||
tscError("%p empty vgroup info", pSql);
|
||||
} else {
|
||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||
//just init, no need to lock
|
||||
SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
||||
// just init, no need to lock
|
||||
SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j];
|
||||
|
||||
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||
pVgroups->vgId = htonl(vmsg->vgId);
|
||||
pVgroups->numOfEps = vmsg->numOfEps;
|
||||
vmsg->vgId = htonl(vmsg->vgId);
|
||||
vmsg->numOfEps = vmsg->numOfEps;
|
||||
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||
}
|
||||
|
||||
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
|
||||
SNewVgroupInfo newVi = createNewVgroupInfo(vmsg);
|
||||
pVgroup->numOfEps = newVi.numOfEps;
|
||||
pVgroup->vgId = newVi.vgId;
|
||||
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||
pVgroup->epAddr[k].port = newVi.ep[k].port;
|
||||
pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
|
||||
pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
|
||||
// check if current buffer contains the vgroup info.
|
||||
// If not, add it
|
||||
SNewVgroupInfo existVgroupInfo = {.inUse = -1};
|
||||
taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo));
|
||||
|
||||
if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) ||
|
||||
(existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||
taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi));
|
||||
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -405,6 +405,7 @@ int taos_affected_rows(TAOS_RES *tres) {
|
|||
|
||||
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
||||
SSqlObj *pSql = (SSqlObj *)res;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
if (pSql == NULL || pSql->signature != pSql) return 0;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
@ -419,7 +420,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
|||
|
||||
SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo;
|
||||
|
||||
if (pFieldInfo->final == NULL) {
|
||||
if (pRes->final == NULL) {
|
||||
TAOS_FIELD* f = calloc(pFieldInfo->numOfOutput, sizeof(TAOS_FIELD));
|
||||
|
||||
int32_t j = 0;
|
||||
|
@ -439,10 +440,10 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
|||
}
|
||||
}
|
||||
|
||||
pFieldInfo->final = f;
|
||||
pRes->final = f;
|
||||
}
|
||||
|
||||
return pFieldInfo->final;
|
||||
return pRes->final;
|
||||
}
|
||||
|
||||
static bool needToFetchNewBlock(SSqlObj* pSql) {
|
||||
|
|
|
@ -429,6 +429,8 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
|||
tfree(pRes->pArithSup->data);
|
||||
tfree(pRes->pArithSup);
|
||||
}
|
||||
|
||||
tfree(pRes->final);
|
||||
|
||||
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
|
||||
}
|
||||
|
@ -1176,7 +1178,6 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
|
|||
}
|
||||
|
||||
taosArrayDestroy(pFieldInfo->internalField);
|
||||
tfree(pFieldInfo->final);
|
||||
|
||||
memset(pFieldInfo, 0, sizeof(SFieldInfo));
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting;
|
|||
extern char tsEmail[];
|
||||
extern char tsArbitrator[];
|
||||
extern int8_t tsArbOnline;
|
||||
extern int32_t tsDnodeId;
|
||||
|
||||
// common
|
||||
extern int tsRpcTimer;
|
||||
|
|
|
@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1;
|
|||
int8_t tsEnableTelemetryReporting = 1;
|
||||
int8_t tsArbOnline = 0;
|
||||
char tsEmail[TSDB_FQDN_LEN] = {0};
|
||||
int32_t tsDnodeId = 0;
|
||||
|
||||
// common
|
||||
int32_t tsRpcTimer = 1000;
|
||||
|
@ -212,7 +213,7 @@ float tsAvailTmpDirectorySpace = 0;
|
|||
float tsAvailDataDirGB = 0;
|
||||
float tsUsedDataDirGB = 0;
|
||||
float tsReservedTmpDirectorySpace = 1.0f;
|
||||
float tsMinimalDataDirGB = 1.0f;
|
||||
float tsMinimalDataDirGB = 2.0f;
|
||||
int32_t tsTotalMemoryMB = 0;
|
||||
uint32_t tsVersion = 0;
|
||||
|
||||
|
|
|
@ -73,6 +73,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
|||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
|
||||
|
||||
int32_t cqObjRef = -1;
|
||||
int32_t cqVnodeNum = 0;
|
||||
|
||||
void cqRmFromList(SCqObj *pObj) {
|
||||
//LOCK in caller
|
||||
|
@ -166,6 +167,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&cqVnodeNum, 1);
|
||||
|
||||
cqCreateRef();
|
||||
|
||||
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
|
||||
|
@ -240,6 +243,13 @@ void cqClose(void *handle) {
|
|||
if (hasCq == 0) {
|
||||
freeSCqContext(pContext);
|
||||
}
|
||||
|
||||
int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1);
|
||||
if (remainn <= 0) {
|
||||
int32_t ref = cqObjRef;
|
||||
cqObjRef = -1;
|
||||
taosCloseRef(ref);
|
||||
}
|
||||
}
|
||||
|
||||
void cqStart(void *handle) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "os.h"
|
||||
#include "cJSON.h"
|
||||
#include "dnodeCfg.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
static SDnodeCfg tsCfg = {0};
|
||||
static pthread_mutex_t tsCfgMutex;
|
||||
|
@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) {
|
|||
|
||||
pthread_mutex_lock(&tsCfgMutex);
|
||||
tsCfg.dnodeId = cfg->dnodeId;
|
||||
tsDnodeId = cfg->dnodeId;
|
||||
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
|
||||
dnodePrintCfg(cfg);
|
||||
dnodeWriteCfg();
|
||||
|
|
|
@ -92,6 +92,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
|
|||
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
||||
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
|
||||
bool checkQIdEqual(void *qHandle, uint64_t qId);
|
||||
int64_t genQueryId(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -163,6 +163,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist")
|
||||
#define TSDB_CODE_MND_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0363) //"Invalid table type in tsdb")
|
||||
#define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x0364) //"Too many tags")
|
||||
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x0365) //"Too many columns")
|
||||
#define TSDB_CODE_MND_TOO_MANY_TIMESERIES TAOS_DEF_ERROR_CODE(0, 0x0366) //"Too many time series")
|
||||
#define TSDB_CODE_MND_NOT_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x0367) //"Not super table") // operation only available for super table
|
||||
#define TSDB_CODE_MND_COL_NAME_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x0368) //"Tag name too long")
|
||||
|
|
|
@ -437,14 +437,14 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
|
|||
return TAOS_DN_OFF_TIME_ZONE_NOT_MATCH;
|
||||
}
|
||||
|
||||
if (0 != strncasecmp(clusterCfg->locale, tsLocale, strlen(tsLocale))) {
|
||||
mError("\"locale\"[%s - %s] cfg parameters inconsistent", clusterCfg->locale, tsLocale);
|
||||
return TAOS_DN_OFF_LOCALE_NOT_MATCH;
|
||||
}
|
||||
if (0 != strncasecmp(clusterCfg->charset, tsCharset, strlen(tsCharset))) {
|
||||
mError("\"charset\"[%s - %s] cfg parameters inconsistent.", clusterCfg->charset, tsCharset);
|
||||
return TAOS_DN_OFF_CHARSET_NOT_MATCH;
|
||||
}
|
||||
// if (0 != strncasecmp(clusterCfg->locale, tsLocale, strlen(tsLocale))) {
|
||||
// mError("\"locale\"[%s - %s] cfg parameters inconsistent", clusterCfg->locale, tsLocale);
|
||||
// return TAOS_DN_OFF_LOCALE_NOT_MATCH;
|
||||
// }
|
||||
// if (0 != strncasecmp(clusterCfg->charset, tsCharset, strlen(tsCharset))) {
|
||||
// mError("\"charset\"[%s - %s] cfg parameters inconsistent.", clusterCfg->charset, tsCharset);
|
||||
// return TAOS_DN_OFF_CHARSET_NOT_MATCH;
|
||||
// }
|
||||
|
||||
if (clusterCfg->enableBalance != tsEnableBalance) {
|
||||
mError("\"balance\"[%d - %d] cfg parameters inconsistent", clusterCfg->enableBalance, tsEnableBalance);
|
||||
|
|
|
@ -1037,6 +1037,19 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
SCreateTableMsg* pCreate = (SCreateTableMsg*)((char*)pCreate1 + sizeof(SCMCreateTableMsg));
|
||||
|
||||
int16_t numOfTags = htons(pCreate->numOfTags);
|
||||
if (numOfTags > TSDB_MAX_TAGS) {
|
||||
mError("msg:%p, app:%p table:%s, failed to create, too many tags", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
|
||||
return TSDB_CODE_MND_TOO_MANY_TAGS;
|
||||
}
|
||||
|
||||
int16_t numOfColumns = htons(pCreate->numOfColumns);
|
||||
int32_t numOfCols = numOfColumns + numOfTags;
|
||||
if (numOfCols > TSDB_MAX_COLUMNS) {
|
||||
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
|
||||
return TSDB_CODE_MND_TOO_MANY_COLUMNS;
|
||||
}
|
||||
|
||||
SSTableObj * pStable = calloc(1, sizeof(SSTableObj));
|
||||
if (pStable == NULL) {
|
||||
mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
|
||||
|
@ -1050,10 +1063,9 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
pStable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
|
||||
pStable->sversion = 0;
|
||||
pStable->tversion = 0;
|
||||
pStable->numOfColumns = htons(pCreate->numOfColumns);
|
||||
pStable->numOfTags = htons(pCreate->numOfTags);
|
||||
pStable->numOfColumns = numOfColumns;
|
||||
pStable->numOfTags = numOfTags;
|
||||
|
||||
int32_t numOfCols = pStable->numOfColumns + pStable->numOfTags;
|
||||
int32_t schemaSize = numOfCols * sizeof(SSchema);
|
||||
pStable->schema = (SSchema *)calloc(1, schemaSize);
|
||||
if (pStable->schema == NULL) {
|
||||
|
@ -1064,11 +1076,6 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
|
||||
|
||||
if (pStable->numOfColumns > TSDB_MAX_COLUMNS || pStable->numOfTags > TSDB_MAX_TAGS) {
|
||||
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
||||
pStable->nextColId = 0;
|
||||
|
||||
for (int32_t col = 0; col < numOfCols; col++) {
|
||||
|
@ -1340,6 +1347,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
|
|||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
if (pStable->numOfColumns + ncols + pStable->numOfTags > TSDB_MAX_COLUMNS) {
|
||||
mError("msg:%p, app:%p stable:%s, add column, too many columns", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
|
||||
return TSDB_CODE_MND_TOO_MANY_COLUMNS;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ncols; i++) {
|
||||
if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
|
||||
mError("msg:%p, app:%p stable:%s, add column, column:%s already exist", pMsg, pMsg->rpcMsg.ahandle,
|
||||
|
|
|
@ -994,6 +994,7 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
|
|||
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
|
||||
pVgroup->dbName);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
if (pVgroup->vnodeGid[i].role != TAOS_SYNC_ROLE_SLAVE) continue;
|
||||
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
|
||||
mDebug("vgId:%d, index:%d, send sync vnode msg to dnode %s", pVgroup->vgId, i,
|
||||
pVgroup->vnodeGid[i].pDnode->dnodeEp);
|
||||
|
|
|
@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() {
|
|||
return tsShellActivityTimer * 2;
|
||||
}
|
||||
|
||||
|
||||
int64_t genQueryId(void) {
|
||||
int64_t uid = 0;
|
||||
int64_t did = tsDnodeId;
|
||||
|
||||
uid = did << 54;
|
||||
|
||||
int64_t pid = ((int64_t)taosGetPId()) & 0x3FF;
|
||||
|
||||
uid |= pid << 44;
|
||||
|
||||
int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF;
|
||||
|
||||
uid |= ts << 11;
|
||||
|
||||
int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF;
|
||||
|
||||
uid |= sid;
|
||||
|
||||
return uid;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') {
|
||||
|
@ -6445,6 +6469,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
|
|||
goto _cleanup_qinfo;
|
||||
}
|
||||
|
||||
pQInfo->qId = *qId;
|
||||
|
||||
// to make sure third party won't overwrite this structure
|
||||
pQInfo->signature = pQInfo;
|
||||
SQuery* pQuery = &pQInfo->query;
|
||||
|
@ -6581,8 +6607,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
|
|||
// todo refactor
|
||||
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
|
||||
|
||||
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
|
||||
*qId = pQInfo->qId;
|
||||
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
|
||||
return pQInfo;
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ bool greaterEqualOperator(SColumnFilterElem *pFilter, const char *minval, const
|
|||
bool equalOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
|
||||
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
int64_t minv = -1, maxv = -1;
|
||||
GET_TYPED_DATA(minv, int64_t, type, minval);
|
||||
GET_TYPED_DATA(maxv, int64_t, type, maxval);
|
||||
|
@ -202,7 +202,7 @@ bool likeOperator(SColumnFilterElem *pFilter, const char *minval, const char *ma
|
|||
bool notEqualOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
|
||||
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
int64_t minv = -1, maxv = -1;
|
||||
GET_TYPED_DATA(minv, int64_t, type, minval);
|
||||
GET_TYPED_DATA(maxv, int64_t, type, maxval);
|
||||
|
|
|
@ -287,6 +287,10 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) {
|
|||
tdListPrependNode(pList, pi->pn);
|
||||
}
|
||||
|
||||
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
|
||||
return pageSize + POINTER_BYTES + 2 + sizeof(tFilePage);
|
||||
}
|
||||
|
||||
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
|
||||
pResultBuf->statis.getPages += 1;
|
||||
|
||||
|
@ -311,7 +315,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
|
|||
|
||||
// allocate buf
|
||||
if (availablePage == NULL) {
|
||||
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES + 2); // add extract bytes in case of zipped buffer increased.
|
||||
pi->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
|
||||
} else {
|
||||
pi->pData = availablePage;
|
||||
}
|
||||
|
@ -355,7 +359,7 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
|
|||
}
|
||||
|
||||
if (availablePage == NULL) {
|
||||
(*pi)->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
|
||||
(*pi)->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize));
|
||||
} else {
|
||||
(*pi)->pData = availablePage;
|
||||
}
|
||||
|
|
|
@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
|
||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||
assert(pQInfo && pQInfo->signature == pQInfo);
|
||||
|
|
|
@ -295,7 +295,7 @@ void *rpcOpen(const SRpcInit *pInit) {
|
|||
return NULL;
|
||||
}
|
||||
} else {
|
||||
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime);
|
||||
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime*30);
|
||||
if ( pRpc->pCache == NULL ) {
|
||||
tError("%s failed to init connection cache", pRpc->label);
|
||||
rpcClose(pRpc);
|
||||
|
@ -470,7 +470,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
|||
taosTmrStopA(&pConn->pTimer);
|
||||
|
||||
// set the idle timer to monitor the activity
|
||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime*30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||
|
||||
// if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
|
||||
|
@ -1367,7 +1367,7 @@ static void rpcProcessConnError(void *param, void *id) {
|
|||
|
||||
tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
|
||||
|
||||
if (pContext->numOfTry >= pContext->epSet.numOfEps) {
|
||||
if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) {
|
||||
rpcMsg.msgType = pContext->msgType+1;
|
||||
rpcMsg.ahandle = pContext->ahandle;
|
||||
rpcMsg.code = pContext->code;
|
||||
|
|
|
@ -409,23 +409,22 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force)
|
|||
syncReleaseNode(pNode);
|
||||
}
|
||||
|
||||
#if 1
|
||||
void syncRecover(int64_t rid) {
|
||||
SSyncPeer *pPeer;
|
||||
|
||||
SSyncNode *pNode = syncAcquireNode(rid);
|
||||
if (pNode == NULL) return;
|
||||
|
||||
// to do: add a few lines to check if recover is OK
|
||||
// if take this node to unsync state, the whole system may not work
|
||||
|
||||
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
|
||||
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
|
||||
nodeVersion = 0;
|
||||
|
||||
pthread_mutex_lock(&pNode->mutex);
|
||||
|
||||
nodeVersion = 0;
|
||||
|
||||
for (int32_t i = 0; i < pNode->replica; ++i) {
|
||||
if (i == pNode->selfIndex) continue;
|
||||
|
||||
pPeer = pNode->peerInfo[i];
|
||||
if (pPeer->peerFd >= 0) {
|
||||
syncRestartConnection(pPeer);
|
||||
|
@ -436,7 +435,6 @@ void syncRecover(int64_t rid) {
|
|||
|
||||
syncReleaseNode(pNode);
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
|
||||
SSyncNode *pNode = syncAcquireNode(rid);
|
||||
|
@ -1000,17 +998,24 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
|||
|
||||
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||
|
||||
int32_t code = 0;
|
||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||
// nodeVersion = pHead->version;
|
||||
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
|
||||
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
|
||||
} else {
|
||||
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
||||
syncSaveIntoBuffer(pPeer, pHead);
|
||||
code = syncSaveIntoBuffer(pPeer, pHead);
|
||||
} else {
|
||||
sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
|
||||
pHead->version);
|
||||
code = -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
sError("%s, failed to process fwd msg, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||
syncRestartConnection(pPeer);
|
||||
}
|
||||
}
|
||||
|
||||
static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
|
||||
|
|
|
@ -2861,12 +2861,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
|
|||
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
|
||||
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
||||
}
|
||||
|
||||
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
||||
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
|
||||
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t elapsed = taosGetTimestampUs() - stime;
|
||||
|
|
|
@ -175,6 +175,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, "Table name too long")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, "Table does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, "Invalid table type in tsdb")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, "Too many time series")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, "Not super table") // operation only available for super table
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, "Tag name too long")
|
||||
|
|
|
@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
|
|||
typedef struct {
|
||||
int32_t vgId; // global vnode group ID
|
||||
int32_t refCount; // reference count
|
||||
int64_t queuedWMsgSize;
|
||||
int32_t queuedWMsg;
|
||||
int32_t queuedRMsg;
|
||||
int32_t flowctrlLevel;
|
||||
|
|
|
@ -99,8 +99,13 @@ int32_t vnodeSync(int32_t vgId) {
|
|||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
}
|
||||
|
||||
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||
if (pVnode->role == TAOS_SYNC_ROLE_SLAVE) {
|
||||
vInfo("vgId:%d, vnode will sync, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||
|
||||
pVnode->version = 0;
|
||||
pVnode->fversion = 0;
|
||||
walResetVersion(pVnode->wal, pVnode->fversion);
|
||||
|
||||
syncRecover(pVnode->sync);
|
||||
}
|
||||
|
||||
|
|
|
@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
|
|||
pRsp->completed = true;
|
||||
}
|
||||
|
||||
|
||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||
void * pCont = pRead->pCont;
|
||||
int32_t contLen = pRead->contLen;
|
||||
|
@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
|
||||
if (contLen != 0) {
|
||||
qinfo_t pQInfo = NULL;
|
||||
uint64_t qId = 0;
|
||||
uint64_t qId = genQueryId();
|
||||
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
|
||||
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "vnodeStatus.h"
|
||||
|
||||
#define MAX_QUEUED_MSG_NUM 100000
|
||||
#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB
|
||||
|
||||
extern void * tsDnodeTmr;
|
||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
|
||||
|
@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
|||
}
|
||||
}
|
||||
|
||||
if (tsAvailDataDirGB <= tsMinimalDataDirGB) {
|
||||
vError("vgId:%d, failed to write into vwqueue since no diskspace, avail:%fGB", pVnode->vgId, tsAvailDataDirGB);
|
||||
taosFreeQitem(pWrite);
|
||||
vnodeRelease(pVnode);
|
||||
return TSDB_CODE_VND_NO_DISKSPACE;
|
||||
}
|
||||
|
||||
if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
|
||||
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
|
||||
vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
|
||||
|
@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
|||
}
|
||||
|
||||
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
||||
if (queued > MAX_QUEUED_MSG_NUM) {
|
||||
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||
|
||||
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
|
||||
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
|
||||
if (ms > 100) ms = 100;
|
||||
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
|
||||
taosMsleep(ms);
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
|
||||
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d size:%" PRId64, pVnode->vgId, pVnode->refCount,
|
||||
pVnode->queuedWMsg, pVnode->queuedWMsgSize);
|
||||
|
||||
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
|
|||
SVnodeObj *pVnode = vparam;
|
||||
|
||||
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
|
||||
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
|
||||
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||
|
||||
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
|
||||
pWrite->rpcMsg.ahandle, queued, queuedSize);
|
||||
|
||||
taosFreeQitem(pWrite);
|
||||
vnodeRelease(pVnode);
|
||||
|
@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
|
|||
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
||||
SVnodeObj *pVnode = pWrite->pVnode;
|
||||
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
|
||||
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
|
||||
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->queuedWMsgSize < MAX_QUEUED_MSG_SIZE &&
|
||||
pVnode->flowctrlLevel <= 0)
|
||||
return 0;
|
||||
|
||||
if (tsEnableFlowCtrl == 0) {
|
||||
int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);
|
||||
|
|
|
@ -28,7 +28,8 @@
|
|||
|
||||
int points = 5;
|
||||
int numOfTables = 3;
|
||||
int tablesProcessed = 0;
|
||||
int tablesInsertProcessed = 0;
|
||||
int tablesSelectProcessed = 0;
|
||||
int64_t st, et;
|
||||
|
||||
typedef struct {
|
||||
|
@ -134,6 +135,9 @@ int main(int argc, char *argv[])
|
|||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
tablesInsertProcessed = 0;
|
||||
tablesSelectProcessed = 0;
|
||||
|
||||
for (i = 0; i<numOfTables; ++i) {
|
||||
// insert records in asynchronous API
|
||||
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
|
||||
|
@ -143,10 +147,20 @@ int main(int argc, char *argv[])
|
|||
printf("once insert finished, presse any key to query\n");
|
||||
getchar();
|
||||
|
||||
while(1) {
|
||||
if (tablesInsertProcessed < numOfTables) {
|
||||
printf("wait for process finished\n");
|
||||
sleep(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
printf("start to query...\n");
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
tablesProcessed = 0;
|
||||
|
||||
|
||||
for (i = 0; i < numOfTables; ++i) {
|
||||
// select records in asynchronous API
|
||||
|
@ -157,14 +171,8 @@ int main(int argc, char *argv[])
|
|||
printf("\nonce finished, press any key to exit\n");
|
||||
getchar();
|
||||
|
||||
for (i = 0; i<numOfTables; ++i) {
|
||||
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
|
||||
}
|
||||
|
||||
getchar();
|
||||
|
||||
while(1) {
|
||||
if (tablesProcessed < numOfTables) {
|
||||
if (tablesSelectProcessed < numOfTables) {
|
||||
printf("wait for process finished\n");
|
||||
sleep(1);
|
||||
continue;
|
||||
|
@ -173,6 +181,10 @@ int main(int argc, char *argv[])
|
|||
break;
|
||||
}
|
||||
|
||||
for (i = 0; i<numOfTables; ++i) {
|
||||
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
|
||||
}
|
||||
|
||||
taos_close(taos);
|
||||
free(tableList);
|
||||
|
||||
|
@ -214,8 +226,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
|
|||
}
|
||||
else {
|
||||
printf("%d rows data are inserted into %s\n", points, pTable->name);
|
||||
tablesProcessed++;
|
||||
if (tablesProcessed >= numOfTables) {
|
||||
tablesInsertProcessed++;
|
||||
if (tablesInsertProcessed >= numOfTables) {
|
||||
gettimeofday(&systemTime, NULL);
|
||||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
|
||||
|
@ -251,15 +263,17 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
|
|||
//taos_free_result(tres);
|
||||
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
|
||||
|
||||
tablesProcessed++;
|
||||
if (tablesProcessed >= numOfTables) {
|
||||
tablesSelectProcessed++;
|
||||
if (tablesSelectProcessed >= numOfTables) {
|
||||
gettimeofday(&systemTime, NULL);
|
||||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
|
||||
}
|
||||
|
||||
taos_free_result(tres);
|
||||
}
|
||||
|
||||
taos_free_result(tres);
|
||||
|
||||
}
|
||||
|
||||
void taos_select_call_back(void *param, TAOS_RES *tres, int code)
|
||||
|
@ -276,6 +290,4 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code)
|
|||
taos_cleanup();
|
||||
exit(1);
|
||||
}
|
||||
|
||||
taos_free_result(tres);
|
||||
}
|
||||
|
|
|
@ -258,6 +258,8 @@ python3 test.py -f subscribe/singlemeter.py
|
|||
#python3 test.py -f subscribe/stability.py
|
||||
python3 test.py -f subscribe/supertable.py
|
||||
|
||||
# topic
|
||||
python3 ./test.py -f topic/topicQuery.py
|
||||
|
||||
#======================p3-end===============
|
||||
#======================p4-start===============
|
||||
|
|
|
@ -43,6 +43,9 @@ class TDTestCase:
|
|||
tdSql.query("select * from tb")
|
||||
tdSql.checkRows(insertRows + 4)
|
||||
|
||||
# test case for https://jira.taosdata.com:18080/browse/TD-3716:
|
||||
tdSql.error("insert into tb(now, 1)")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from util.log import tdLog
|
||||
from util.cases import tdCases
|
||||
from util.sql import tdSql
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
self.ts = 1538548685000
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
# test case for https://jira.taosdata.com:18080/browse/TD-3679
|
||||
print("==============step1")
|
||||
tdSql.execute(
|
||||
"create topic tq_test partitions 10")
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(0, %d, 'aaaa')" % self.ts)
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(1, %d, 'aaaa')" % (self.ts + 1))
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(2, %d, 'aaaa')" % (self.ts + 2))
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(3, %d, 'aaaa')" % (self.ts + 3))
|
||||
|
||||
print("==============step2")
|
||||
|
||||
tdSql.query("select * from tq_test.p1")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from tq_test.p1 where ts >= %d" % self.ts)
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from tq_test.p1 where ts > %d" % self.ts)
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query("select * from tq_test.p1 where ts = %d" % self.ts)
|
||||
tdSql.checkRows(1)
|
||||
|
||||
|
||||
tdSql.execute("use db")
|
||||
tdSql.execute("create table test(ts timestamp, start timestamp, value int)")
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts, self.ts))
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 1, self.ts + 1))
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 2, self.ts + 2))
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 3, self.ts + 3))
|
||||
|
||||
tdSql.query("select * from test")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from test where ts >= %d" % self.ts)
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from test where ts > %d" % self.ts)
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query("select * from test where ts = %d" % self.ts)
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select * from test where start >= %d" % self.ts)
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from test where start > %d" % self.ts)
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query("select * from test where start = %d" % self.ts)
|
||||
tdSql.checkRows(1)
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue