Merge branch 'develop' of https://github.com/taosdata/TDengine into develop
This commit is contained in:
commit
67237838c2
|
@ -678,29 +678,16 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||||
int32_t keySize = 0;
|
|
||||||
switch (pTable->keyType) {
|
|
||||||
case SDB_KEY_STRING:
|
|
||||||
case SDB_KEY_VAR_STRING:
|
|
||||||
keySize = strlen((char *)key) + 1;
|
|
||||||
break;
|
|
||||||
case SDB_KEY_INT:
|
|
||||||
case SDB_KEY_AUTO:
|
|
||||||
keySize = sizeof(uint32_t);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize + SDB_SYNC_HACK;
|
|
||||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||||
|
|
||||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||||
pHead->version = 0;
|
pHead->version = 0;
|
||||||
pHead->len = keySize;
|
|
||||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
|
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
|
||||||
memcpy(pHead->cont, key, keySize);
|
|
||||||
|
pOper->rowData = pHead->cont;
|
||||||
|
(*pTable->encodeFp)(pOper);
|
||||||
|
pHead->len = pOper->rowSize;
|
||||||
|
|
||||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||||
|
|
||||||
|
|
|
@ -1753,11 +1753,43 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeAddNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
|
static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
|
||||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||||
mLPrint("app:%p:%p, ctable %s, add column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
mError("app:%p:%p, ctable %s, failed to alter column, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(NULL, pTable);
|
||||||
|
if (pMDCreate == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->pVgroup == NULL) {
|
||||||
|
pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
|
||||||
|
if (pMsg->pVgroup == NULL) {
|
||||||
|
rpcFreeCont(pMDCreate);
|
||||||
|
mError("app:%p:%p, ctable %s, vgId:%d not exist in mnode", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||||
|
pTable->vgId);
|
||||||
|
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.handle = pMsg,
|
||||||
|
.pCont = pMDCreate,
|
||||||
|
.contLen = htonl(pMDCreate->contLen),
|
||||||
|
.code = 0,
|
||||||
|
.msgType = TSDB_MSG_TYPE_MD_ALTER_TABLE
|
||||||
|
};
|
||||||
|
|
||||||
|
mTrace("app:%p:%p, ctable %s, send alter column msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||||
|
pMsg->pVgroup->vgId);
|
||||||
|
|
||||||
|
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
||||||
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) {
|
static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) {
|
||||||
|
@ -1802,7 +1834,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
|
||||||
.table = tsChildTableSdb,
|
.table = tsChildTableSdb,
|
||||||
.pObj = pTable,
|
.pObj = pTable,
|
||||||
.pMsg = pMsg,
|
.pMsg = pMsg,
|
||||||
.cb = mnodeAddNormalTableColumnCb
|
.cb = mnodeAlterNormalTableColumnCb
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -1813,13 +1845,6 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeDropNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
|
|
||||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
|
||||||
mLPrint("app:%p:%p, ctable %s, drop column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
|
||||||
tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
|
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
|
||||||
SDbObj *pDb = pMsg->pDb;
|
SDbObj *pDb = pMsg->pDb;
|
||||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||||
|
@ -1847,7 +1872,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
|
||||||
.table = tsChildTableSdb,
|
.table = tsChildTableSdb,
|
||||||
.pObj = pTable,
|
.pObj = pTable,
|
||||||
.pMsg = pMsg,
|
.pMsg = pMsg,
|
||||||
.cb = mnodeDropNormalTableColumnCb
|
.cb = mnodeAlterNormalTableColumnCb
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -2185,9 +2210,33 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// not implemented yet
|
|
||||||
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
||||||
mTrace("alter table rsp received, handle:%p code:%s", rpcMsg->handle, tstrerror(rpcMsg->code));
|
if (rpcMsg->handle == NULL) return;
|
||||||
|
|
||||||
|
SMnodeMsg *mnodeMsg = rpcMsg->handle;
|
||||||
|
mnodeMsg->received++;
|
||||||
|
|
||||||
|
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
||||||
|
assert(pTable);
|
||||||
|
|
||||||
|
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||||
|
mTrace("app:%p:%p, ctable:%s, altered in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
|
||||||
|
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
|
||||||
|
|
||||||
|
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
|
||||||
|
} else {
|
||||||
|
if (mnodeMsg->retry++ < 3) {
|
||||||
|
mTrace("app:%p:%p, table:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p",
|
||||||
|
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, tstrerror(rpcMsg->code),
|
||||||
|
mnodeMsg->rpcMsg.handle);
|
||||||
|
|
||||||
|
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
|
||||||
|
} else {
|
||||||
|
mError("app:%p:%p, table:%s, failed to alter in dnode, result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
|
||||||
|
pTable->info.tableId, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle);
|
||||||
|
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
|
@ -437,7 +437,7 @@ static void *taosProcessTcpData(void *param) {
|
||||||
while (1) {
|
while (1) {
|
||||||
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
||||||
if (pThreadObj->stop) {
|
if (pThreadObj->stop) {
|
||||||
tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
|
tTrace("%s TCP thread get stop event, exiting...", pThreadObj->label);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (fdNum < 0) continue;
|
if (fdNum < 0) continue;
|
||||||
|
|
|
@ -142,16 +142,15 @@ void taosCleanUpUdpConnection(void *handle) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
pConn->signature = NULL;
|
pConn->signature = NULL;
|
||||||
|
|
||||||
// shutdown to signal the thread to exit
|
if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR);
|
||||||
if ( pConn->fd >=0) shutdown(pConn->fd, SHUT_RD);
|
if (pConn->fd >=0) taosCloseSocket(pConn->fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pSet->threads; ++i) {
|
for (int i = 0; i < pSet->threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
if (pConn->thread) pthread_join(pConn->thread, NULL);
|
if (pConn->thread) pthread_join(pConn->thread, NULL);
|
||||||
if (pConn->fd >=0) taosCloseSocket(pConn->fd);
|
|
||||||
tfree(pConn->buffer);
|
tfree(pConn->buffer);
|
||||||
tTrace("UDP chandle:%p is closed", pConn);
|
tTrace("%s UDP thread is closed, inedx:%d", pConn->label, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pSet);
|
tfree(pSet);
|
||||||
|
@ -185,15 +184,15 @@ static void *taosRecvUdpData(void *param) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
|
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
|
||||||
if(dataLen == 0) {
|
if(dataLen <= 0) {
|
||||||
tTrace("data length is 0, socket was closed, exiting");
|
tTrace("%s UDP socket was closed, exiting", pConn->label);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
port = ntohs(sourceAdd.sin_port);
|
port = ntohs(sourceAdd.sin_port);
|
||||||
|
|
||||||
if (dataLen < sizeof(SRpcHead)) {
|
if (dataLen < sizeof(SRpcHead)) {
|
||||||
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));
|
tError("%s recvfrom failed(%s)", pConn->label, strerror(errno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,13 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
// TODO: disposed in tsdb
|
||||||
|
// STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
|
||||||
|
// if (pCfg == NULL) return terrno;
|
||||||
|
// if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) code = terrno;
|
||||||
|
|
||||||
|
// tsdbClearTableCfg(pCfg);
|
||||||
|
vTrace("vgId:%d, alter table msg is received", pVnode->vgId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -180,11 +180,9 @@ if $data28 != null then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ======== step4
|
print ======== step4
|
||||||
sleep 2500
|
|
||||||
sql alter table tb drop column d
|
sql alter table tb drop column d
|
||||||
sql alter table tb drop column e
|
sql alter table tb drop column e
|
||||||
sql insert into tb values(now-19d, -19, 6, 3, 0)
|
sql insert into tb values(now-19d, -19, 6, 3, 0)
|
||||||
sleep 3000
|
|
||||||
sql select * from tb order by ts desc
|
sql select * from tb order by ts desc
|
||||||
if $rows != 4 then
|
if $rows != 4 then
|
||||||
return -1
|
return -1
|
||||||
|
@ -287,10 +285,8 @@ if $data38 != null then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ======== step5
|
print ======== step5
|
||||||
sleep 2500
|
|
||||||
sql alter table tb drop column g
|
sql alter table tb drop column g
|
||||||
sql insert into tb values(now-16d, -16, 9, 5)
|
sql insert into tb values(now-16d, -16, 9, 5)
|
||||||
sleep 3000
|
|
||||||
sql select count(f) from tb
|
sql select count(f) from tb
|
||||||
if $data00 != 5 then
|
if $data00 != 5 then
|
||||||
return -1
|
return -1
|
||||||
|
@ -421,10 +417,8 @@ if $data48 != null then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ======== step6
|
print ======== step6
|
||||||
sleep 2500
|
|
||||||
sql alter table tb drop column f
|
sql alter table tb drop column f
|
||||||
sql insert into tb values(now-13d, -13, 7)
|
sql insert into tb values(now-13d, -13, 7)
|
||||||
sleep 3000
|
|
||||||
sql select * from tb order by ts desc
|
sql select * from tb order by ts desc
|
||||||
if $rows != 6 then
|
if $rows != 6 then
|
||||||
return -1
|
return -1
|
||||||
|
@ -551,10 +545,8 @@ if $data58 != null then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ======== step7
|
print ======== step7
|
||||||
sleep 2500
|
|
||||||
sql alter table tb drop column h
|
sql alter table tb drop column h
|
||||||
sql insert into tb values(now-10d, -10)
|
sql insert into tb values(now-10d, -10)
|
||||||
sleep 3000
|
|
||||||
sql select * from tb order by ts desc
|
sql select * from tb order by ts desc
|
||||||
if $rows != 7 then
|
if $rows != 7 then
|
||||||
return -1
|
return -1
|
||||||
|
|
Loading…
Reference in New Issue