Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

This commit is contained in:
Tao Liu 2020-07-13 06:18:15 +00:00
commit 1ab19463eb
28 changed files with 206 additions and 109 deletions

View File

@ -415,7 +415,7 @@ int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
//void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column); //void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex); SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL); assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType; int32_t type = pInfo->pSqlExpr->resType;

View File

@ -43,7 +43,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = param; pSql->param = param;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp; pSql->fp = fp;
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);

View File

@ -497,7 +497,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->maxRetry = TSDB_MAX_REPLICA;
pStmt->pSql = pSql; pStmt->pSql = pSql;
return pStmt; return pStmt;

View File

@ -5489,9 +5489,9 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
} }
if (pCreate->replications != -1 && if (pCreate->replications != -1 &&
(pCreate->replications < TSDB_MIN_REPLICA_NUM || pCreate->replications > TSDB_MAX_REPLICA_NUM)) { (pCreate->replications < TSDB_MIN_DB_REPLICA_OPTION || pCreate->replications > TSDB_MAX_DB_REPLICA_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications, snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications,
TSDB_MIN_REPLICA_NUM, TSDB_MAX_REPLICA_NUM); TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }

View File

@ -113,7 +113,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->maxRetry = TSDB_MAX_REPLICA;
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pObj->pDnodeConn = pDnodeConn; pObj->pDnodeConn = pDnodeConn;

View File

@ -107,7 +107,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = pSql; pSql->param = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = asyncCallback; pSql->fp = asyncCallback;
int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);

View File

@ -550,7 +550,7 @@ static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1,
return true; return true;
} }
static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) { static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
tscDebug("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql); tscDebug("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
@ -568,10 +568,7 @@ static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParent
*s2 = taosArrayInit(p2->num, p2->tagSize); *s2 = taosArrayInit(p2->num, p2->tagSize);
if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) { if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
freeJoinSubqueryObj(pParentSql); return TSDB_CODE_QRY_DUP_JOIN_KEY;
pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
tscQueueAsyncRes(pParentSql);
return;
} }
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
@ -594,6 +591,8 @@ static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParent
i++; i++;
} }
} }
return TSDB_CODE_SUCCESS;
} }
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
@ -680,7 +679,14 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
} }
SArray *s1 = NULL, *s2 = NULL; SArray *s1 = NULL, *s2 = NULL;
getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
if (code != TSDB_CODE_SUCCESS) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
return;
}
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql); tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);

View File

@ -1649,7 +1649,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
pNew->fp = fp; pNew->fp = fp;
pNew->param = param; pNew->param = param;
pNew->maxRetry = TSDB_MAX_REPLICA_NUM; pNew->maxRetry = TSDB_MAX_REPLICA;
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) { if (pNew->sqlstr == NULL) {
@ -1804,7 +1804,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->fp = fp; pNew->fp = fp;
pNew->param = param; pNew->param = param;
pNew->maxRetry = TSDB_MAX_REPLICA_NUM; pNew->maxRetry = TSDB_MAX_REPLICA;
char* name = pTableMetaInfo->name; char* name = pTableMetaInfo->name;
STableMetaInfo* pFinalInfo = NULL; STableMetaInfo* pFinalInfo = NULL;

View File

@ -110,7 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION; int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
int32_t tsReplications = TSDB_DEFAULT_REPLICA_NUM; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsMaxVgroupsPerDb = 0; int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES; int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
// balance // balance
@ -706,8 +706,8 @@ static void doInitGlobalConfig() {
cfg.ptr = &tsReplications; cfg.ptr = &tsReplications;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_REPLICA_NUM; cfg.minValue = TSDB_MIN_DB_REPLICA_OPTION;
cfg.maxValue = TSDB_MAX_REPLICA_NUM; cfg.maxValue = TSDB_MAX_DB_REPLICA_OPTION;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);

View File

@ -232,8 +232,9 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pWrite->contLen; pHead->len = pWrite->contLen;
dDebug("%p, msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
} else { } else {
dDebug("%p, wal msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
} }

View File

@ -332,9 +332,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_WAL_LEVEL 2 #define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_REPLICA_NUM 1 #define TSDB_MIN_DB_REPLICA_OPTION 1
#define TSDB_MAX_REPLICA_NUM 3 #define TSDB_MAX_DB_REPLICA_OPTION 3
#define TSDB_DEFAULT_REPLICA_NUM 1 #define TSDB_DEFAULT_DB_REPLICA_OPTION 1
#define TSDB_MAX_JOIN_TABLE_NUM 5 #define TSDB_MAX_JOIN_TABLE_NUM 5
#define TSDB_MAX_UNION_CLAUSE 5 #define TSDB_MAX_UNION_CLAUSE 5

View File

@ -200,6 +200,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb inval
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "tsdb create table information")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")

View File

@ -646,7 +646,7 @@ typedef struct SCMSTableVgroupMsg {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t numOfIps; int8_t numOfIps;
SIpAddr ipAddr[TSDB_MAX_REPLICA_NUM]; SIpAddr ipAddr[TSDB_MAX_REPLICA];
} SCMVgroupInfo; } SCMVgroupInfo;
typedef struct { typedef struct {

View File

@ -94,6 +94,7 @@ void sdbDecRef(void *thandle, void *pRow);
int64_t sdbGetNumOfRows(void *handle); int64_t sdbGetNumOfRows(void *handle);
int32_t sdbGetId(void *handle); int32_t sdbGetId(void *handle);
uint64_t sdbGetVersion(); uint64_t sdbGetVersion();
bool sdbCheckRowDeleted(void *thandle, void *pRow);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -287,9 +287,9 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION; return TSDB_CODE_MND_INVALID_DB_OPTION;
} }
if (pCfg->replications < TSDB_MIN_REPLICA_NUM || pCfg->replications > TSDB_MAX_REPLICA_NUM) { if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_REPLICA_NUM, mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION,
TSDB_MAX_REPLICA_NUM); TSDB_MAX_DB_REPLICA_OPTION);
return TSDB_CODE_MND_INVALID_DB_OPTION; return TSDB_CODE_MND_INVALID_DB_OPTION;
} }

View File

@ -393,7 +393,7 @@ void sdbCleanUp() {
} }
void sdbIncRef(void *handle, void *pObj) { void sdbIncRef(void *handle, void *pObj) {
if (pObj == NULL) return; if (pObj == NULL || handle == NULL) return;
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos);
@ -402,7 +402,7 @@ void sdbIncRef(void *handle, void *pObj) {
} }
void sdbDecRef(void *handle, void *pObj) { void sdbDecRef(void *handle, void *pObj) {
if (pObj == NULL) return; if (pObj == NULL || handle == NULL) return;
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos);
@ -661,6 +661,14 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
SSdbTable *pTable = pTableInput;
if (pTable == NULL) return false;
int8_t *updateEnd = pRow + pTable->refCountPos - 1;
return (*updateEnd == 1);
}
int32_t sdbDeleteRow(SSdbOper *pOper) { int32_t sdbDeleteRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table; SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;

View File

@ -72,7 +72,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn);
static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg);
@ -376,7 +376,7 @@ static void mnodeCleanupChildTables() {
} }
static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
pStable->numOfTables++; atomic_add_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) { if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
@ -385,18 +385,22 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt
if (pStable->vgHash != NULL) { if (pStable->vgHash != NULL) {
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) { if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId)); taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is put into stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash));
} }
} }
} }
static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
pStable->numOfTables--; atomic_sub_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) return; if (pStable->vgHash == NULL) return;
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId)); taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash));
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
} }
@ -748,11 +752,15 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
} }
if (pMsg->pTable->type == TSDB_SUPER_TABLE) { if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
mInfo("app:%p:%p, table:%s, start to drop stable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); SSuperTableObj *pSTable = (SSuperTableObj *)pMsg->pTable;
mInfo("app:%p:%p, table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d",
pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash));
return mnodeProcessDropSuperTableMsg(pMsg); return mnodeProcessDropSuperTableMsg(pMsg);
} else { } else {
mInfo("app:%p:%p, table:%s, start to drop ctable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable;
return mnodeProcessDropChildTableMsg(pMsg); mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pCTable->vgId, pCTable->sid, pCTable->uid);
return mnodeProcessDropChildTableMsg(pMsg, true);
} }
} }
@ -799,7 +807,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
assert(pTable); assert(pTable);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb", pTable->info.tableId); mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid);
} else { } else {
mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code)); tstrerror(code));
@ -887,7 +895,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable;
if (pStable->numOfTables != 0) { if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash); SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash);
while (taosHashIterNext(pIter)) { while (taosHashIterNext(pIter)) {
int32_t *pVgId = taosHashIterGet(pIter); int32_t *pVgId = taosHashIterGet(pIter);
@ -1756,7 +1764,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
} }
} }
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId); if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) { if (pMsg->pVgroup == NULL) {
@ -1780,7 +1788,9 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
mInfo("app:%p:%p, table:%s, send drop ctable msg", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); mInfo("app:%p:%p, table:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = pMsg, .ahandle = pMsg,
.pCont = pDrop, .pCont = pDrop,
@ -1789,6 +1799,8 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
}; };
if (!needReturn) rpcMsg.ahandle = NULL;
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
@ -2112,7 +2124,7 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) {
int32_t numOfTables = 0; int32_t numOfTables = 0;
SChildTableObj *pTable = NULL; SChildTableObj *pTable = NULL;
mInfo("stable:%s, all child tables(%d) will dropped from sdb", pStable->info.tableId, numOfTables); mInfo("stable:%s, all child tables:%d will dropped from sdb", pStable->info.tableId, pStable->numOfTables);
while (1) { while (1) {
pIter = mnodeGetNextChildTable(pIter, &pTable); pIter = mnodeGetNextChildTable(pIter, &pTable);
@ -2187,12 +2199,15 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable); assert(pTable);
mInfo("app:%p:%p, table:%s, drop table rsp received, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); mInfo("app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%" PRIu64 ", thandle:%p result:%s",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid,
mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) { if (rpcMsg->code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, table:%s, failed to drop in dnode, reason:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, mError("app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%" PRIu64 ", reason:%s",
pTable->info.tableId, tstrerror(rpcMsg->code)); mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid,
tstrerror(rpcMsg->code));
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
return; return;
} }
@ -2239,6 +2254,14 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable); assert(pTable);
// If the table is deleted by another thread during creation, stop creating and send drop msg to vnode
if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) {
mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64,
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid);
mnodeProcessDropChildTableMsg(mnodeMsg, false);
rpcMsg->code = TSDB_CODE_SUCCESS;
}
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont;
if (pCreate->getMeta) { if (pCreate->getMeta) {

View File

@ -61,6 +61,9 @@
#define HTTP_CHECK_BODY_CONTINUE 0 #define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1 #define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_READ_DATA_SUCCESS 0
#define HTTP_READ_DATA_FAILED 1
#define HTTP_WRITE_RETRY_TIMES 500 #define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000 #define HTTP_EXPIRED_TIME 60000

View File

@ -23,6 +23,6 @@ void httpCleanUpConnect();
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer); void httpCleanUpServer(HttpServer *pServer);
bool httpReadDataImp(HttpContext *pContext); int httpReadDataImp(HttpContext *pContext);
#endif #endif

View File

@ -60,6 +60,7 @@ bool httpParseURL(HttpContext* pContext) {
char* pSeek; char* pSeek;
char* pEnd = strchr(pParser->pLast, ' '); char* pEnd = strchr(pParser->pLast, ' ');
if (pEnd == NULL) { if (pEnd == NULL) {
httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL);
return false; return false;
} }
@ -275,14 +276,14 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test)
return true; return true;
} }
bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { int httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) {
bool parsedOk = httpParseChunkedBody(pContext, pParser, true); bool parsedOk = httpParseChunkedBody(pContext, pParser, true);
if (parsedOk) { if (parsedOk) {
httpParseChunkedBody(pContext, pParser, false); httpParseChunkedBody(pContext, pParser, false);
return HTTP_CHECK_BODY_SUCCESS; return HTTP_CHECK_BODY_SUCCESS;
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr);
if (!httpReadDataImp(pContext)) { if (httpReadDataImp(pContext) != HTTP_READ_DATA_SUCCESS) {
httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr);
return HTTP_CHECK_BODY_ERROR; return HTTP_CHECK_BODY_ERROR;
} else { } else {
@ -296,7 +297,6 @@ int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) {
if (dataReadLen > pParser->data.len) { if (dataReadLen > pParser->data.len) {
httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d", httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d",
pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len);
httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR);
return HTTP_CHECK_BODY_ERROR; return HTTP_CHECK_BODY_ERROR;
} else if (dataReadLen < pParser->data.len) { } else if (dataReadLen < pParser->data.len) {
httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read", httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read",
@ -358,20 +358,13 @@ bool httpParseRequest(HttpContext* pContext) {
} }
int httpCheckReadCompleted(HttpContext* pContext) { int httpCheckReadCompleted(HttpContext* pContext) {
HttpParser *pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
if (pContext->httpChunked == HTTP_UNCUNKED) {
int ret = httpReadUnChunkedBody(pContext, pParser);
if (ret != HTTP_CHECK_BODY_SUCCESS) {
return ret;
}
} else {
int ret = httpReadChunkedBody(pContext, pParser);
if (ret != HTTP_CHECK_BODY_SUCCESS) {
return ret;
}
}
return HTTP_CHECK_BODY_SUCCESS; if (pContext->httpChunked == HTTP_UNCUNKED) {
return httpReadUnChunkedBody(pContext, pParser);
} else {
return httpReadChunkedBody(pContext, pParser);
}
} }
bool httpDecodeRequest(HttpContext* pContext) { bool httpDecodeRequest(HttpContext* pContext) {

View File

@ -69,7 +69,7 @@ void httpCleanUpConnect() {
httpDebug("http server:%s is cleaned up", pServer->label); httpDebug("http server:%s is cleaned up", pServer->label);
} }
bool httpReadDataImp(HttpContext *pContext) { int httpReadDataImp(HttpContext *pContext) {
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
@ -85,8 +85,7 @@ bool httpReadDataImp(HttpContext *pContext) {
} else { } else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
pContext, pContext->fd, pContext->ipstr, errno); pContext, pContext->fd, pContext->ipstr, errno);
httpReleaseContext(pContext); return HTTP_READ_DATA_FAILED;
return false;
} }
} else { } else {
pParser->bufsize += nread; pParser->bufsize += nread;
@ -95,15 +94,13 @@ bool httpReadDataImp(HttpContext *pContext) {
if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d", httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE); pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE);
httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG); return HTTP_REQUSET_TOO_BIG;
httpNotifyContextClose(pContext);
return false;
} }
} }
pParser->buffer[pParser->bufsize] = 0; pParser->buffer[pParser->bufsize] = 0;
return true; return HTTP_READ_DATA_SUCCESS;
} }
static bool httpDecompressData(HttpContext *pContext) { static bool httpDecompressData(HttpContext *pContext) {
@ -141,8 +138,14 @@ static bool httpReadData(HttpContext *pContext) {
httpInitContext(pContext); httpInitContext(pContext);
} }
if (!httpReadDataImp(pContext)) { int32_t code = httpReadDataImp(pContext);
if (code != HTTP_READ_DATA_SUCCESS) {
if (code == HTTP_READ_DATA_FAILED) {
httpReleaseContext(pContext);
} else {
httpSendErrorResp(pContext, code);
httpNotifyContextClose(pContext); httpNotifyContextClose(pContext);
}
return false; return false;
} }

View File

@ -5627,17 +5627,23 @@ static void freeQInfo(SQInfo *pQInfo);
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) { STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
return NULL;
}
SQuery *pQuery = calloc(1, sizeof(SQuery));
pQInfo->runtimeEnv.pQuery = pQuery;
int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput; int16_t numOfOutput = pQueryMsg->numOfOutput;
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
goto _cleanup_qinfo;
}
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
pQInfo->tableGroupInfo = *pTableGroupInfo;
SQuery *pQuery = calloc(1, sizeof(SQuery));
if (pQuery == NULL) {
goto _cleanup_query;
}
pQInfo->runtimeEnv.pQuery = pQuery;
pQuery->numOfCols = numOfCols; pQuery->numOfCols = numOfCols;
pQuery->numOfOutput = numOfOutput; pQuery->numOfOutput = numOfOutput;
pQuery->limit.limit = pQueryMsg->limit; pQuery->limit.limit = pQueryMsg->limit;
@ -5651,6 +5657,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
pQuery->fillType = pQueryMsg->fillType; pQuery->fillType = pQueryMsg->fillType;
pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->numOfTags = pQueryMsg->numOfTags;
pQuery->tagColList = pTagCols;
// todo do not allocate ?? // todo do not allocate ??
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
@ -5663,8 +5670,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters); pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters);
} }
pQuery->tagColList = pTagCols;
// calculate the result row size // calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) { for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].bytes > 0); assert(pExprs[col].bytes > 0);
@ -5709,10 +5714,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
memcpy(pQuery->fillVal, (char *)pQueryMsg->fillVal, pQuery->numOfOutput * sizeof(int64_t)); memcpy(pQuery->fillVal, (char *)pQueryMsg->fillVal, pQuery->numOfOutput * sizeof(int64_t));
} }
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
pQInfo->tableGroupInfo = *pTableGroupInfo;
size_t numOfGroups = 0; size_t numOfGroups = 0;
if (pTableGroupInfo->pGroupList != NULL) { if (pTableGroupInfo->pGroupList != NULL) {
numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
@ -5775,6 +5776,21 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo; return pQInfo;
_cleanup_qinfo:
tsdbDestoryTableGroup(pTableGroupInfo);
_cleanup_query:
taosArrayDestroy(pGroupbyExpr->columnInfo);
tfree(pGroupbyExpr);
tfree(pTagCols);
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExprs[i];
if (pExprInfo->pExpr != NULL) {
tExprTreeDestroy(&pExprInfo->pExpr, NULL);
}
}
tfree(pExprs);
_cleanup: _cleanup:
freeQInfo(pQInfo); freeQInfo(pQInfo);
return NULL; return NULL;
@ -5893,6 +5909,7 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
// todo refactor, extract method to destroytableDataInfo // todo refactor, extract method to destroytableDataInfo
if (pQInfo->tableqinfoGroupInfo.pGroupList != NULL) {
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroups; ++i) { for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = GET_TABLEGROUP(pQInfo, i); SArray *p = GET_TABLEGROUP(pQInfo, i);
@ -5907,6 +5924,7 @@ static void freeQInfo(SQInfo *pQInfo) {
taosArrayDestroy(p); taosArrayDestroy(p);
} }
}
tfree(pQInfo->pBuf); tfree(pQInfo->pBuf);
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList); taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);

View File

@ -57,8 +57,30 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
STable * super = NULL; STable * super = NULL;
STable * table = NULL; STable * table = NULL;
int newSuper = 0; int newSuper = 0;
int tid = pCfg->tableId.tid;
STable * pTable = NULL;
STable *pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid); if (tid < 0 || tid >= pRepo->config.maxTables) {
tsdbError("vgId:%d failed to create table since invalid tid %d", REPO_ID(pRepo), tid);
terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO;
goto _err;
}
if (pMeta->tables[tid] != NULL) {
if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable));
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
} else {
tsdbError("vgId:%d table %s at tid %d uid %" PRIu64
" exists, replace it with new table, this can be not reasonable",
REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]),
TABLE_UID(pMeta->tables[tid]));
tsdbDropTable(pRepo, pMeta->tables[tid]->tableId);
}
}
pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid);
if (pTable != NULL) { if (pTable != NULL) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable)); TABLE_TID(pTable), TABLE_UID(pTable));
@ -72,10 +94,10 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
super = tsdbNewTable(pCfg, true); super = tsdbNewTable(pCfg, true);
if (super == NULL) goto _err; if (super == NULL) goto _err;
} else { } else {
// TODO if (TABLE_TYPE(super) != TSDB_SUPER_TABLE || TABLE_UID(super) != pCfg->superUid) {
if (super->type != TSDB_SUPER_TABLE) return -1; terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO;
if (super->tableId.uid != pCfg->superUid) return -1; goto _err;
// tsdbUpdateTable(pRepo, super, pCfg); }
} }
} }
@ -705,6 +727,9 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
T_REF_INC(pTable); T_REF_INC(pTable);
tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
TABLE_UID(pTable));
return pTable; return pTable;
_err: _err:
@ -714,7 +739,9 @@ _err:
static void tsdbFreeTable(STable *pTable) { static void tsdbFreeTable(STable *pTable) {
if (pTable) { if (pTable) {
if (pTable->name != NULL) tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable)); if (pTable->name != NULL)
tsdbDebug("table %s tid %d uid %" PRIu64 " is destroyed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
TABLE_UID(pTable));
tfree(TABLE_NAME(pTable)); tfree(TABLE_NAME(pTable));
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) { for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) {
@ -782,7 +809,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
tsdbGetTableSchemaImpl(pTable, false, false, -1)); tsdbGetTableSchemaImpl(pTable, false, false, -1));
} }
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable)); TABLE_TID(pTable), TABLE_UID(pTable));
return 0; return 0;

View File

@ -23,13 +23,12 @@ extern "C" {
#include "os.h" #include "os.h"
#define TARRAY_MIN_SIZE 8 #define TARRAY_MIN_SIZE 8
#define TARRAY_GET_ELEM(array, index) ((void*)((array)->pData + (index) * (array)->elemSize)) #define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize))
typedef struct SArray { typedef struct SArray {
size_t size; size_t size;
size_t capacity; size_t capacity;
size_t elemSize; size_t elemSize;
void* pData; void* pData;
} SArray; } SArray;

View File

@ -176,16 +176,28 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
pVnode->status = TAOS_VN_STATUS_UPDATING; pVnode->status = TAOS_VN_STATUS_UPDATING;
int32_t code = vnodeSaveCfg(pVnodeCfg); int32_t code = vnodeSaveCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = vnodeReadCfg(pVnode); code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = syncReconfig(pVnode->sync, &pVnode->syncCfg); code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is altered", pVnode->vgId); vDebug("vgId:%d, vnode is altered", pVnode->vgId);

View File

@ -184,6 +184,8 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
taosWriteQitem(pVnode->wqueue, type, pWal); taosWriteQitem(pVnode->wqueue, type, pWal);
return 0; return 0;

View File

@ -110,7 +110,7 @@ echo "second ${HOSTNAME}:7200" >> $TAOS_CFG
echo "serverPort ${NODE}" >> $TAOS_CFG echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 135" >> $TAOS_CFG echo "debugFlag 131" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 135" >> $TAOS_CFG