Merge remote-tracking branch 'origin/3.0' into feature/qnode2
This commit is contained in:
commit
e10c66d8a0
|
@ -1332,7 +1332,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t ignoreNotExists;
|
int64_t suid;
|
||||||
} SVDropTbReq;
|
} SVDropTbReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -421,6 +421,10 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
|
||||||
pMgmt->totalVnodes = numOfVnodes;
|
pMgmt->totalVnodes = numOfVnodes;
|
||||||
|
|
||||||
int32_t threadNum = pDnode->env.numOfCores;
|
int32_t threadNum = pDnode->env.numOfCores;
|
||||||
|
#if 1
|
||||||
|
threadNum = 1;
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||||
|
|
||||||
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
|
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
|
||||||
|
|
|
@ -179,7 +179,12 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
|
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
|
SUserObj *pUser = NULL;
|
||||||
|
SDbObj *pDb = NULL;
|
||||||
|
SConnObj *pConn = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
SConnectReq *pConnReq = pReq->rpcMsg.pCont;
|
SConnectReq *pConnReq = pReq->rpcMsg.pCont;
|
||||||
pConnReq->pid = htonl(pConnReq->pid);
|
pConnReq->pid = htonl(pConnReq->pid);
|
||||||
pConnReq->startTime = htobe64(pConnReq->startTime);
|
pConnReq->startTime = htobe64(pConnReq->startTime);
|
||||||
|
@ -187,54 +192,61 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
|
||||||
SRpcConnInfo info = {0};
|
SRpcConnInfo info = {0};
|
||||||
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
|
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
|
||||||
mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
|
mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
|
||||||
return -1;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
char ip[30];
|
char ip[30];
|
||||||
taosIp2String(info.clientIp, ip);
|
taosIp2String(info.clientIp, ip);
|
||||||
|
|
||||||
|
pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
|
if (pUser == NULL) {
|
||||||
|
mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
|
||||||
|
goto CONN_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
if (pConnReq->db[0]) {
|
if (pConnReq->db[0]) {
|
||||||
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pReq->acctId, TS_PATH_DELIMITER, pConnReq->db);
|
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db);
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pReq->db);
|
pDb = mndAcquireDb(pMnode, pReq->db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
|
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
|
||||||
return -1;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SConnObj *pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
|
pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
|
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
|
||||||
return -1;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
|
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mndReleaseConn(pMnode, pConn);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
|
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
|
||||||
return -1;
|
goto CONN_OVER;
|
||||||
}
|
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
|
||||||
if (pUser != NULL) {
|
|
||||||
pRsp->acctId = htonl(pUser->acctId);
|
|
||||||
pRsp->superUser = pUser->superUser;
|
|
||||||
mndReleaseUser(pMnode, pUser);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRsp->acctId = htonl(pUser->acctId);
|
||||||
|
pRsp->superUser = pUser->superUser;
|
||||||
pRsp->clusterId = htobe64(pMnode->clusterId);
|
pRsp->clusterId = htobe64(pMnode->clusterId);
|
||||||
pRsp->connId = htonl(pConn->id);
|
pRsp->connId = htonl(pConn->id);
|
||||||
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
|
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
|
||||||
mndReleaseConn(pMnode, pConn);
|
|
||||||
|
|
||||||
pReq->contLen = sizeof(SConnectRsp);
|
pReq->contLen = sizeof(SConnectRsp);
|
||||||
pReq->pCont = pRsp;
|
pReq->pCont = pRsp;
|
||||||
|
|
||||||
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
|
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
|
||||||
return 0;
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
CONN_OVER:
|
||||||
|
|
||||||
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
mndReleaseConn(pMnode, pConn);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
|
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
|
||||||
|
@ -258,33 +270,27 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
char *batchReqStr = pReq->rpcMsg.pCont;
|
char *batchReqStr = pReq->rpcMsg.pCont;
|
||||||
SClientHbBatchReq batchReq = {0};
|
SClientHbBatchReq batchReq = {0};
|
||||||
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
|
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
|
||||||
SArray *pArray = batchReq.reqs;
|
SArray *pArray = batchReq.reqs;
|
||||||
int sz = taosArrayGetSize(pArray);
|
int sz = taosArrayGetSize(pArray);
|
||||||
|
|
||||||
SClientHbBatchRsp batchRsp = {0};
|
SClientHbBatchRsp batchRsp = {0};
|
||||||
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
|
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
|
||||||
|
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
SClientHbReq* pHbReq = taosArrayGet(pArray, i);
|
SClientHbReq *pHbReq = taosArrayGet(pArray, i);
|
||||||
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
|
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
|
||||||
|
|
||||||
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
|
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
|
||||||
SClientHbRsp rsp = {
|
SClientHbRsp rsp = {.status = 0, .connKey = pHbReq->connKey, .bodyLen = 0, .body = NULL};
|
||||||
.status = 0,
|
|
||||||
.connKey = pHbReq->connKey,
|
|
||||||
.bodyLen = 0,
|
|
||||||
.body = NULL
|
|
||||||
};
|
|
||||||
taosArrayPush(batchRsp.rsps, &rsp);
|
taosArrayPush(batchRsp.rsps, &rsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
|
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
|
||||||
void* buf = rpcMallocCont(tlen);
|
void *buf = rpcMallocCont(tlen);
|
||||||
void* bufCopy = buf;
|
void *bufCopy = buf;
|
||||||
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp);
|
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp);
|
||||||
pReq->contLen = tlen;
|
pReq->contLen = tlen;
|
||||||
pReq->pCont = buf;
|
pReq->pCont = buf;
|
||||||
|
|
|
@ -31,16 +31,16 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb);
|
||||||
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
||||||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
|
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
|
||||||
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb);
|
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
|
||||||
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg);
|
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg);
|
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg);
|
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg);
|
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp);
|
||||||
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg);
|
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp);
|
||||||
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg);
|
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp);
|
||||||
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitStb(SMnode *pMnode) {
|
int32_t mndInitStb(SMnode *pMnode) {
|
||||||
|
@ -52,13 +52,13 @@ int32_t mndInitStb(SMnode *pMnode) {
|
||||||
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
|
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcesSMCreateStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcesSMAlterStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcesSMDropStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
|
||||||
|
@ -177,27 +177,27 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
|
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb);
|
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
|
||||||
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
|
atomic_exchange_32(&pOld->updateTime, pNew->updateTime);
|
||||||
atomic_exchange_32(&pOldStb->version, pNewStb->version);
|
atomic_exchange_32(&pOld->version, pNew->version);
|
||||||
|
|
||||||
taosWLockLatch(&pOldStb->lock);
|
taosWLockLatch(&pOld->lock);
|
||||||
pOldStb->numOfColumns = pNewStb->numOfColumns;
|
pOld->numOfColumns = pNew->numOfColumns;
|
||||||
pOldStb->numOfTags = pNewStb->numOfTags;
|
pOld->numOfTags = pNew->numOfTags;
|
||||||
int32_t totalCols = pNewStb->numOfTags + pNewStb->numOfColumns;
|
int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
|
||||||
int32_t totalSize = totalCols * sizeof(SSchema);
|
int32_t totalSize = totalCols * sizeof(SSchema);
|
||||||
|
|
||||||
if (pOldStb->numOfTags + pOldStb->numOfColumns < totalCols) {
|
if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
|
||||||
void *pSchema = malloc(totalSize);
|
void *pSchema = malloc(totalSize);
|
||||||
if (pSchema != NULL) {
|
if (pSchema != NULL) {
|
||||||
free(pOldStb->pSchema);
|
free(pOld->pSchema);
|
||||||
pOldStb->pSchema = pSchema;
|
pOld->pSchema = pSchema;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pOldStb->pSchema, pNewStb->pSchema, totalSize);
|
memcpy(pOld->pSchema, pNew->pSchema, totalSize);
|
||||||
taosWUnLockLatch(&pOldStb->lock);
|
taosWUnLockLatch(&pOld->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +215,7 @@ void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
|
||||||
sdbRelease(pSdb, pStb);
|
sdbRelease(pSdb, pStb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
|
static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
|
@ -225,17 +225,17 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
|
||||||
return mndAcquireDb(pMnode, db);
|
return mndAcquireDb(pMnode, db);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) {
|
static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
|
||||||
SVCreateTbReq req;
|
SVCreateTbReq req;
|
||||||
void *buf;
|
void *buf;
|
||||||
int bsize;
|
int32_t bsize;
|
||||||
SMsgHead *pMsgHead;
|
SMsgHead *pMsgHead;
|
||||||
|
|
||||||
req.ver = 0;
|
req.ver = 0;
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, pStb->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
req.name = (char*) tNameGetTableName(&name);
|
req.name = (char *)tNameGetTableName(&name);
|
||||||
req.ttl = 0;
|
req.ttl = 0;
|
||||||
req.keep = 0;
|
req.keep = 0;
|
||||||
req.type = TD_SUPER_TABLE;
|
req.type = TD_SUPER_TABLE;
|
||||||
|
@ -264,7 +264,7 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
|
static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
|
||||||
int32_t contLen = sizeof(SVDropTbReq);
|
int32_t contLen = sizeof(SVDropTbReq);
|
||||||
|
|
||||||
SVDropTbReq *pDrop = calloc(1, contLen);
|
SVDropTbReq *pDrop = calloc(1, contLen);
|
||||||
|
@ -276,12 +276,12 @@ static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj
|
||||||
pDrop->head.contLen = htonl(contLen);
|
pDrop->head.contLen = htonl(contLen);
|
||||||
pDrop->head.vgId = htonl(pVgroup->vgId);
|
pDrop->head.vgId = htonl(pVgroup->vgId);
|
||||||
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
|
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||||
// pDrop->suid = htobe64(pStb->uid);
|
pDrop->suid = htobe64(pStb->uid);
|
||||||
|
|
||||||
return pDrop;
|
return pDrop;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckCreateStbMsg(SMCreateStbReq *pCreate) {
|
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||||
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
|
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
|
||||||
pCreate->numOfTags = htonl(pCreate->numOfTags);
|
pCreate->numOfTags = htonl(pCreate->numOfTags);
|
||||||
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
|
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
|
||||||
|
@ -356,15 +356,15 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
int contLen;
|
int32_t contLen;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (pVgroup->dbUid != pDb->uid) continue;
|
if (pVgroup->dbUid != pDb->uid) continue;
|
||||||
|
|
||||||
void *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb, &contLen);
|
void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||||
if (pMsg == NULL) {
|
if (pReq == NULL) {
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -373,11 +373,11 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
action.pCont = pMsg;
|
action.pCont = pReq;
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
action.msgType = TDMT_VND_CREATE_STB;
|
action.msgType = TDMT_VND_CREATE_STB;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pReq);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -398,8 +398,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (pVgroup->dbUid != pDb->uid) continue;
|
if (pVgroup->dbUid != pDb->uid) continue;
|
||||||
|
|
||||||
SVDropTbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
|
SVDropTbReq *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb);
|
||||||
if (pMsg == NULL) {
|
if (pReq == NULL) {
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -408,11 +408,11 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
action.pCont = pMsg;
|
action.pCont = pReq;
|
||||||
action.contLen = sizeof(SVDropTbReq);
|
action.contLen = sizeof(SVDropTbReq);
|
||||||
action.msgType = TDMT_VND_DROP_STB;
|
action.msgType = TDMT_VND_DROP_STB;
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pReq);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -423,7 +423,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||||
SStbObj stbObj = {0};
|
SStbObj stbObj = {0};
|
||||||
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -449,43 +449,17 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCr
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) goto CREATE_STB_OVER;
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||||
goto CREATE_STB_OVER;
|
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||||
}
|
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||||
|
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_STB_OVER;
|
||||||
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
|
|
||||||
goto CREATE_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
|
||||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
|
||||||
goto CREATE_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
|
|
||||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
|
||||||
goto CREATE_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
|
|
||||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
|
||||||
goto CREATE_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
@ -494,13 +468,13 @@ CREATE_STB_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
|
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont;
|
SMCreateStbReq *pCreate = pReq->rpcMsg.pCont;
|
||||||
|
|
||||||
mDebug("stb:%s, start to create", pCreate->name);
|
mDebug("stb:%s, start to create", pCreate->name);
|
||||||
|
|
||||||
if (mndCheckCreateStbMsg(pCreate) != 0) {
|
if (mndCheckCreateStbReq(pCreate) != 0) {
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -536,7 +510,7 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndCreateStb(pMnode, pMsg, pCreate, pDb);
|
int32_t code = mndCreateStb(pMnode, pReq, pCreate, pDb);
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -548,12 +522,12 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
|
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
|
||||||
mndTransProcessRsp(pMsg);
|
mndTransProcessRsp(pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
|
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
||||||
SSchema *pSchema = &pAlter->schema;
|
SSchema *pSchema = &pAlter->schema;
|
||||||
pSchema->colId = htonl(pSchema->colId);
|
pSchema->colId = htonl(pSchema->colId);
|
||||||
pSchema->bytes = htonl(pSchema->bytes);
|
pSchema->bytes = htonl(pSchema->bytes);
|
||||||
|
@ -578,15 +552,15 @@ static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
|
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pOld, SStbObj *pNew) { return 0; }
|
||||||
|
|
||||||
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
|
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont;
|
SMAlterStbReq *pAlter = pReq->rpcMsg.pCont;
|
||||||
|
|
||||||
mDebug("stb:%s, start to alter", pAlter->name);
|
mDebug("stb:%s, start to alter", pAlter->name);
|
||||||
|
|
||||||
if (mndCheckAlterStbMsg(pAlter) != 0) {
|
if (mndCheckAlterStbReq(pAlter) != 0) {
|
||||||
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -601,7 +575,7 @@ static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
|
||||||
SStbObj stbObj = {0};
|
SStbObj stbObj = {0};
|
||||||
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
||||||
|
|
||||||
int32_t code = mndUpdateStb(pMnode, pMsg, pStb, &stbObj);
|
int32_t code = mndUpdateStb(pMnode, pReq, pStb, &stbObj);
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -612,8 +586,8 @@ static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) {
|
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) {
|
||||||
mndTransProcessRsp(pMsg);
|
mndTransProcessRsp(pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -648,44 +622,19 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj
|
||||||
|
|
||||||
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
|
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
|
||||||
|
|
||||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
|
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL)goto DROP_STB_OVER;
|
||||||
mError("stb:%s, failed to drop since %s", pStb->name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||||
|
|
||||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) {
|
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
goto DROP_STB_OVER;
|
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
}
|
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
|
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_STB_OVER;
|
||||||
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
|
|
||||||
goto DROP_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) {
|
|
||||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
|
||||||
goto DROP_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) {
|
|
||||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
|
||||||
goto DROP_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) {
|
|
||||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
|
||||||
goto DROP_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
|
||||||
goto DROP_STB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
@ -694,9 +643,9 @@ DROP_STB_OVER:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
|
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SMDropStbReq *pDrop = pMsg->rpcMsg.pCont;
|
SMDropStbReq *pDrop = pReq->rpcMsg.pCont;
|
||||||
|
|
||||||
mDebug("stb:%s, start to drop", pDrop->name);
|
mDebug("stb:%s, start to drop", pDrop->name);
|
||||||
|
|
||||||
|
@ -712,7 +661,7 @@ static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndDropStb(pMnode, pMsg, pStb);
|
int32_t code = mndDropStb(pMnode, pReq, pStb);
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -724,14 +673,14 @@ static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
|
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) {
|
||||||
mndTransProcessRsp(pMsg);
|
mndTransProcessRsp(pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
STableInfoReq *pInfo = pReq->rpcMsg.pCont;
|
||||||
|
|
||||||
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
|
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
|
||||||
|
|
||||||
|
@ -786,8 +735,8 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
|
||||||
pMsg->pCont = pMeta;
|
pReq->pCont = pMeta;
|
||||||
pMsg->contLen = contLen;
|
pReq->contLen = contLen;
|
||||||
|
|
||||||
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
|
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -820,8 +769,8 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
|
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
||||||
|
@ -883,8 +832,8 @@ static void mndExtractTableName(char *tableId, char *name) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
|
|
|
@ -85,7 +85,6 @@ enum {
|
||||||
typedef struct STableCheckInfo {
|
typedef struct STableCheckInfo {
|
||||||
uint64_t tableId;
|
uint64_t tableId;
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
STable* pTableObj;
|
|
||||||
SBlockInfo* pCompInfo;
|
SBlockInfo* pCompInfo;
|
||||||
int32_t compSize;
|
int32_t compSize;
|
||||||
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
|
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
|
||||||
|
@ -141,8 +140,6 @@ typedef struct STsdbReadHandle {
|
||||||
STableBlockInfo* pDataBlockInfo;
|
STableBlockInfo* pDataBlockInfo;
|
||||||
SDataCols *pDataCols; // in order to hold current file data block
|
SDataCols *pDataCols; // in order to hold current file data block
|
||||||
int32_t allocSize; // allocated data block size
|
int32_t allocSize; // allocated data block size
|
||||||
// STsdb
|
|
||||||
// STsdbMemTable * pMemTable;
|
|
||||||
SArray *defaultLoadColumn;// default load column
|
SArray *defaultLoadColumn;// default load column
|
||||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
||||||
|
@ -204,8 +201,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
|
||||||
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
|
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
|
||||||
|
|
||||||
// the primary timestamp column does not be included in the the specified load column list, add it
|
// the primary timestamp column does not be included in the the specified load column list, add it
|
||||||
if (loadTS && colId != 0) {
|
if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
int16_t columnId = 0;
|
int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
taosArrayInsert(pLocalIdList, 0, &columnId);
|
taosArrayInsert(pLocalIdList, 0, &columnId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +289,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
||||||
for (int32_t j = 0; j < gsize; ++j) {
|
for (int32_t j = 0; j < gsize; ++j) {
|
||||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
||||||
|
|
||||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
|
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey};
|
||||||
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||||
|
|
||||||
|
@ -315,10 +312,9 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
||||||
|
|
||||||
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||||
size_t gsize = taosArrayGetSize(pTableCheckInfo);
|
size_t gsize = taosArrayGetSize(pTableCheckInfo);
|
||||||
for (int32_t i = 0; i < gsize; ++i) {
|
// for (int32_t i = 0; i < gsize; ++i) {
|
||||||
STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
|
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
|
||||||
taosArrayPush(pTable, &pInfo->pTableObj);
|
// }
|
||||||
}
|
|
||||||
|
|
||||||
*psTable = pTable;
|
*psTable = pTable;
|
||||||
return pTableCheckInfo;
|
return pTableCheckInfo;
|
||||||
|
@ -347,15 +343,11 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
|
||||||
// only one table, not need to sort again
|
// only one table, not need to sort again
|
||||||
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
|
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
|
||||||
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
|
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
|
||||||
SArray* pTable = taosArrayInit(1, sizeof(STable*));
|
|
||||||
|
|
||||||
STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
|
STableCheckInfo info = { .lastKey = skey};
|
||||||
|
|
||||||
info.tableId = pCheckInfo->tableId;
|
info.tableId = pCheckInfo->tableId;
|
||||||
taosArrayPush(pNew, &info);
|
taosArrayPush(pNew, &info);
|
||||||
taosArrayPush(pTable, &pCheckInfo->pTableObj);
|
|
||||||
|
|
||||||
*psTable = pTable;
|
|
||||||
return pNew;
|
return pNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,9 +453,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
||||||
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// STsdbMeta* pMeta = NULL;//tsdbGetMeta(tsdb);
|
|
||||||
// assert(pMeta != NULL);
|
|
||||||
|
|
||||||
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
|
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
|
||||||
if (pReadHandle->pDataCols == NULL) {
|
if (pReadHandle->pDataCols == NULL) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
|
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
|
||||||
|
@ -641,12 +630,6 @@ SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) {
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
||||||
SArray* res = taosArrayInit(size, POINTER_BYTES);
|
SArray* res = taosArrayInit(size, POINTER_BYTES);
|
||||||
|
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
|
|
||||||
taosArrayPush(res, &pCheckInfo->pTableObj);
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1049,7 +1032,10 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
|
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
|
||||||
pCheckInfo->numOfBlocks = 0;
|
pCheckInfo->numOfBlocks = 0;
|
||||||
|
|
||||||
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
|
STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
|
||||||
|
table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||||
|
|
||||||
|
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1149,7 +1135,7 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
|
||||||
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
|
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
STSchema *pSchema = NULL;//tsdbGetTableSchema(pCheckInfo->pTableObj);
|
STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||||
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||||
|
@ -1184,7 +1170,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
||||||
|
|
||||||
pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
|
pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
|
||||||
pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
|
pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
|
||||||
pBlockLoadInfo->uid = pCheckInfo->pTableObj->uid;
|
pBlockLoadInfo->uid = pCheckInfo->tableId;
|
||||||
|
|
||||||
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
||||||
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
|
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
|
||||||
|
@ -1878,7 +1864,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
|
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
|
||||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
||||||
|
|
||||||
STable* pTable = pCheckInfo->pTableObj;
|
STable* pTable = NULL;
|
||||||
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
|
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
|
||||||
|
@ -1932,7 +1918,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
rv2 = memRowVersion(row2);
|
rv2 = memRowVersion(row2);
|
||||||
}
|
}
|
||||||
|
|
||||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
|
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
|
||||||
numOfRows += 1;
|
numOfRows += 1;
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = key;
|
cur->win.skey = key;
|
||||||
|
@ -1958,7 +1944,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
}
|
}
|
||||||
|
|
||||||
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
|
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
|
||||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
|
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
|
||||||
numOfRows += 1;
|
numOfRows += 1;
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = key;
|
cur->win.skey = key;
|
||||||
|
@ -2745,7 +2731,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
|
||||||
// if (ret != TSDB_CODE_SUCCESS) {
|
// if (ret != TSDB_CODE_SUCCESS) {
|
||||||
// return false;
|
// return false;
|
||||||
// }
|
// }
|
||||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->pTableObj, NULL, NULL, true);
|
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
|
||||||
tfree(pRow);
|
tfree(pRow);
|
||||||
|
|
||||||
// update the last key value
|
// update the last key value
|
||||||
|
@ -3389,14 +3375,14 @@ SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList)
|
||||||
if (pHandle->cur.mixBlock) {
|
if (pHandle->cur.mixBlock) {
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
} else {
|
} else {
|
||||||
SDataBlockInfo binfo = {0};/*GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);*/
|
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
|
||||||
assert(pHandle->realNumOfRows <= binfo.rows);
|
assert(pHandle->realNumOfRows <= binfo.rows);
|
||||||
|
|
||||||
// data block has been loaded, todo extract method
|
// data block has been loaded, todo extract method
|
||||||
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
|
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
|
||||||
|
|
||||||
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
|
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
|
||||||
pBlockLoadInfo->uid == pCheckInfo->pTableObj->tid) {
|
pBlockLoadInfo->uid == pCheckInfo->tableId) {
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
} else { // only load the file block
|
} else { // only load the file block
|
||||||
SBlock* pBlock = pBlockInfo->compBlock;
|
SBlock* pBlock = pBlockInfo->compBlock;
|
||||||
|
|
|
@ -551,7 +551,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
|
||||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
||||||
int numOfColIds) {
|
int numOfColIds) {
|
||||||
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
||||||
ASSERT(colIds[0] == 0);
|
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||||
|
|
||||||
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||||
SBlockCol blockCol = {0};
|
SBlockCol blockCol = {0};
|
||||||
|
@ -588,7 +588,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
||||||
if (pDataCol == NULL) continue;
|
if (pDataCol == NULL) continue;
|
||||||
ASSERT(pDataCol->colId == colId);
|
ASSERT(pDataCol->colId == colId);
|
||||||
|
|
||||||
if (colId == 0) { // load the key row
|
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row
|
||||||
blockCol.colId = colId;
|
blockCol.colId = colId;
|
||||||
blockCol.len = pBlock->keyLen;
|
blockCol.len = pBlock->keyLen;
|
||||||
blockCol.type = pDataCol->type;
|
blockCol.type = pDataCol->type;
|
||||||
|
|
|
@ -4927,6 +4927,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
||||||
|
|
||||||
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
|
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
|
||||||
if (NULL == pMsg) { // todo handle malloc error
|
if (NULL == pMsg) { // todo handle malloc error
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
@ -7381,6 +7382,7 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
|
||||||
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
|
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
|
||||||
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
||||||
cond.twindow = pTableScanNode->window;
|
cond.twindow = pTableScanNode->window;
|
||||||
|
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||||
|
|
||||||
for(int32_t i = 0; i < cond.numOfCols; ++i) {
|
for(int32_t i = 0; i < cond.numOfCols; ++i) {
|
||||||
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
|
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
|
||||||
|
|
|
@ -13,23 +13,126 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#ifdef USE_UV
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
#include "transportInt.h"
|
#include "transportInt.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
int main() {
|
struct QueueElem {
|
||||||
SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5};
|
queue q;
|
||||||
void* p = rpcOpen(&init);
|
int val;
|
||||||
|
};
|
||||||
while (1) {
|
class QueueObj {
|
||||||
std::cout << "cron task" << std::endl;
|
public:
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000));
|
QueueObj() {
|
||||||
|
// avoid formate
|
||||||
|
QUEUE_INIT(&head);
|
||||||
}
|
}
|
||||||
|
void Push(QueueElem *el) {
|
||||||
|
// avoid formate
|
||||||
|
QUEUE_PUSH(&head, &el->q);
|
||||||
|
}
|
||||||
|
QueueElem *Pop() {
|
||||||
|
QueueElem *el = NULL;
|
||||||
|
if (!IsEmpty()) {
|
||||||
|
queue *h = QUEUE_HEAD(&head);
|
||||||
|
el = QUEUE_DATA(h, QueueElem, q);
|
||||||
|
QUEUE_REMOVE(&el->q);
|
||||||
|
}
|
||||||
|
return el;
|
||||||
|
}
|
||||||
|
bool IsEmpty() {
|
||||||
|
// avoid formate
|
||||||
|
return QUEUE_IS_EMPTY(&head);
|
||||||
|
}
|
||||||
|
void RmElem(QueueElem *el) {
|
||||||
|
// impl
|
||||||
|
QUEUE_REMOVE(&el->q);
|
||||||
|
}
|
||||||
|
void ForEach(std::vector<int> &result) {
|
||||||
|
queue *h;
|
||||||
|
QUEUE_FOREACH(h, &head) {
|
||||||
|
// add more
|
||||||
|
QueueElem *el = QUEUE_DATA(h, QueueElem, q);
|
||||||
|
result.push_back(el->val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
queue head;
|
||||||
|
};
|
||||||
|
|
||||||
|
class QueueEnv : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
virtual void SetUp() {
|
||||||
|
// TODO
|
||||||
|
q = new QueueObj();
|
||||||
|
}
|
||||||
|
virtual void TearDown() {
|
||||||
|
delete q;
|
||||||
|
// formate
|
||||||
|
}
|
||||||
|
QueueObj *q;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(QueueEnv, testPushAndPop) {
|
||||||
|
// add more test
|
||||||
|
assert(q->IsEmpty());
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
QueueElem *el = (QueueElem *)malloc(sizeof(QueueElem));
|
||||||
|
el->val = i;
|
||||||
|
q->Push(el);
|
||||||
|
}
|
||||||
|
int i = 0;
|
||||||
|
while (!q->IsEmpty()) {
|
||||||
|
QueueElem *el = q->Pop();
|
||||||
|
assert(el->val == i++);
|
||||||
|
free(el);
|
||||||
|
}
|
||||||
|
assert(q->IsEmpty());
|
||||||
}
|
}
|
||||||
|
TEST_F(QueueEnv, testRm) {
|
||||||
|
// add more test
|
||||||
|
|
||||||
|
std::vector<QueueElem *> set;
|
||||||
|
assert(q->IsEmpty());
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
QueueElem *el = (QueueElem *)malloc(sizeof(QueueElem));
|
||||||
|
el->val = i;
|
||||||
|
q->Push(el);
|
||||||
|
set.push_back(el);
|
||||||
|
}
|
||||||
|
for (int i = set.size() - 1; i >= 0; i--) {
|
||||||
|
QueueElem *el = set[i];
|
||||||
|
q->RmElem(el);
|
||||||
|
free(el);
|
||||||
|
}
|
||||||
|
assert(q->IsEmpty());
|
||||||
|
}
|
||||||
|
TEST_F(QueueEnv, testIter) {
|
||||||
|
// add more test
|
||||||
|
assert(q->IsEmpty());
|
||||||
|
std::vector<int> vals;
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
QueueElem *el = (QueueElem *)malloc(sizeof(QueueElem));
|
||||||
|
el->val = i;
|
||||||
|
q->Push(el);
|
||||||
|
vals.push_back(i);
|
||||||
|
}
|
||||||
|
std::vector<int> result;
|
||||||
|
q->ForEach(result);
|
||||||
|
assert(result.size() == vals.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue