commit
9587ed3efc
|
@ -33,7 +33,7 @@ typedef enum {
|
|||
TSDB_SUPER_TABLE = 1, // super table
|
||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
||||
TSDB_STREAM_TABLE = 4, // table created by stream processing
|
||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
||||
TSDB_TABLE_MAX = 6
|
||||
} ETableType;
|
||||
|
@ -50,7 +50,12 @@ typedef enum {
|
|||
TSDB_CHECK_ITEM_MAX
|
||||
} ECheckItemType;
|
||||
|
||||
typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig;
|
||||
typedef enum {
|
||||
TD_ROW_DISCARD_UPDATE = 0,
|
||||
TD_ROW_OVERWRITE_UPDATE = 1,
|
||||
TD_ROW_PARTIAL_UPDATE = 2,
|
||||
} TDUpdateConfig;
|
||||
|
||||
typedef enum {
|
||||
TSDB_STATIS_OK = 0, // statis part exist and load successfully
|
||||
TSDB_STATIS_NONE = 1, // statis part not exist
|
||||
|
|
|
@ -136,6 +136,23 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock)
|
|||
return (void*)buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// int32_t numOfOutput = pBlock->info.numOfCols;
|
||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||
tfree(pColInfoData->pData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
tfree(pBlock->pBlockAgg);
|
||||
// tfree(pBlock);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
|
||||
int32_t tlen = 0;
|
||||
int32_t sz = 0;
|
||||
|
@ -178,23 +195,6 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// int32_t numOfOutput = pBlock->info.numOfCols;
|
||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||
tfree(pColInfoData->pData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
tfree(pBlock->pBlockAgg);
|
||||
// tfree(pBlock);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
|
||||
if (pRsp->schemas) {
|
||||
if (pRsp->schemas->nCols) {
|
||||
|
@ -204,10 +204,6 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
|
|||
}
|
||||
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
|
||||
pRsp->pBlockData = NULL;
|
||||
// for (int32_t i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
|
||||
// SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
|
||||
// tDeleteSSDataBlock(pDataBlock);
|
||||
//}
|
||||
}
|
||||
|
||||
//======================================================================================================================
|
||||
|
|
|
@ -1139,6 +1139,17 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS
|
|||
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
|
||||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
int64_t streamId;
|
||||
char* sql;
|
||||
char* executorMsg;
|
||||
} SMVCreateStreamReq, SMSCreateStreamReq;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
} SMVCreateStreamRsp, SMSCreateStreamRsp;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
|
|
|
@ -80,6 +80,10 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
|
|||
*/
|
||||
int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
|
||||
/**
|
||||
* @brief Drop a snode.
|
||||
*
|
||||
|
|
|
@ -2666,7 +2666,6 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
|||
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
|
|
@ -90,9 +90,11 @@ typedef struct {
|
|||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
int8_t uniqueWorkerInUse;
|
||||
SSnode *pSnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker writeWorker;
|
||||
SArray *uniqueWorkers; // SArray<SDnodeWorker*>
|
||||
SDnodeWorker sharedWorker;
|
||||
} SSnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
@ -153,4 +155,4 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_ENV_H_*/
|
||||
#endif /*_TD_DND_ENV_H_*/
|
||||
|
|
|
@ -70,4 +70,4 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_INT_H_*/
|
||||
#endif /*_TD_DND_INT_H_*/
|
||||
|
|
|
@ -19,7 +19,20 @@
|
|||
#include "dndTransport.h"
|
||||
#include "dndWorker.h"
|
||||
|
||||
static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t refCount;
|
||||
int32_t snVersion;
|
||||
int8_t dropped;
|
||||
char *path;
|
||||
SSnode *pImpl;
|
||||
STaosQueue *pSharedQ;
|
||||
STaosQueue *pUniqueQ;
|
||||
} SSnodeObj;
|
||||
|
||||
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
|
||||
|
||||
static SSnode *dndAcquireSnode(SDnode *pDnode) {
|
||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||
|
@ -152,8 +165,21 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) {
|
|||
|
||||
static int32_t dndStartSnodeWorker(SDnode *pDnode) {
|
||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) {
|
||||
dError("failed to start snode write worker since %s", terrstr());
|
||||
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
|
||||
for (int32_t i = 0; i < 2; i++) {
|
||||
SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker));
|
||||
if (pUniqueWorker == NULL) {
|
||||
return -1;
|
||||
}
|
||||
if (dndInitWorker(pDnode, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) {
|
||||
dError("failed to start snode unique worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker);
|
||||
}
|
||||
if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4,
|
||||
dndProcessSnodeSharedQueue)) {
|
||||
dError("failed to start snode shared worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -169,9 +195,13 @@ static void dndStopSnodeWorker(SDnode *pDnode) {
|
|||
|
||||
while (pMgmt->refCount > 0) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
dndCleanupWorker(&pMgmt->writeWorker);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
|
||||
SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i);
|
||||
dndCleanupWorker(worker);
|
||||
}
|
||||
taosArrayDestroy(pMgmt->uniqueWorkers);
|
||||
}
|
||||
|
||||
static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
|
||||
|
@ -292,17 +322,36 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||
SRpcMsg *pRsp = NULL;
|
||||
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||
|
||||
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||
if (pSnode != NULL) {
|
||||
code = sndProcessMsg(pSnode, pMsg, &pRsp);
|
||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||
SRpcMsg *pMsg = NULL;
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
||||
sndProcessUMsg(pSnode, pMsg);
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
dndReleaseSnode(pDnode, pSnode);
|
||||
}
|
||||
|
||||
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||
|
||||
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||
if (pSnode != NULL) {
|
||||
code = sndProcessSMsg(pSnode, pMsg);
|
||||
}
|
||||
dndReleaseSnode(pDnode, pSnode);
|
||||
|
||||
#if 0
|
||||
if (pMsg->msgType & 1u) {
|
||||
if (pRsp != NULL) {
|
||||
pRsp->ahandle = pMsg->ahandle;
|
||||
|
@ -314,11 +363,32 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||
|
||||
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||
if (pSnode != NULL) {
|
||||
int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers);
|
||||
SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index);
|
||||
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
|
||||
}
|
||||
dndReleaseSnode(pDnode, pSnode);
|
||||
|
||||
if (code != 0) {
|
||||
if (pMsg->msgType & 1u) {
|
||||
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||
rpcSendResponse(&rsp);
|
||||
}
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
|
||||
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||
|
||||
|
@ -337,8 +407,13 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc
|
|||
}
|
||||
}
|
||||
|
||||
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg);
|
||||
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
// judge from msg to write to unique queue
|
||||
dndWriteSnodeMsgToRandomWorker(pDnode, pMsg);
|
||||
}
|
||||
|
||||
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t dndInitSnode(SDnode *pDnode) {
|
||||
|
|
|
@ -109,4 +109,4 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen)
|
|||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "mndShow.h"
|
||||
#include "mndSnode.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndStream.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndSync.h"
|
||||
#include "mndTelem.h"
|
||||
|
@ -220,6 +221,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
|
|||
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
|
||||
|
|
|
@ -31,3 +31,15 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
}
|
||||
|
||||
void sndDestroy(const char *path) {}
|
||||
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
// stream deployment
|
||||
// stream stop/resume
|
||||
// operator exec
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
// operator exec
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -83,8 +83,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
|
|||
}
|
||||
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
|
||||
/*int32_t sversion = pHandle->pBlock->sversion;*/
|
||||
/*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
|
||||
// currently only rows are used
|
||||
|
||||
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
|
||||
pBlockInfo->rows = pHandle->pBlock->numOfRows;
|
||||
pBlockInfo->uid = pHandle->pBlock->uid;
|
||||
|
|
|
@ -188,7 +188,7 @@ void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueu
|
|||
|
||||
int32_t tWWorkerInit(SWWorkerPool *pool) {
|
||||
pool->nextId = 0;
|
||||
pool->workers = calloc(sizeof(SWWorker), pool->max);
|
||||
pool->workers = calloc(pool->max, sizeof(SWWorker));
|
||||
if (pool->workers == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue