Merge pull request #10657 from taosdata/feature/tq

define stream token
This commit is contained in:
Liu Jicong 2022-03-09 19:58:52 +08:00 committed by GitHub
commit 35b82d79c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 58 additions and 22 deletions

View File

@ -59,7 +59,7 @@ typedef struct {
SWalCfg walCfg; SWalCfg walCfg;
uint32_t hashBegin; uint32_t hashBegin;
uint32_t hashEnd; uint32_t hashEnd;
int8_t hashMethod; int8_t hashMethod;
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
@ -202,6 +202,22 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ READ --------------------------- */ /* ------------------------- TQ READ --------------------------- */
enum {
TQ_STREAM_TOKEN__DATA = 1,
TQ_STREAM_TOKEN__WATERMARK,
TQ_STREAM_TOKEN__CHECKPOINT,
};
typedef struct {
int8_t type;
int8_t reserved[7];
union {
void *data;
int64_t wmTs;
int64_t checkpointId;
};
} STqStreamToken;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) { static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) {

View File

@ -16,9 +16,11 @@
#ifndef _TQ_PUSH_H_ #ifndef _TQ_PUSH_H_
#define _TQ_PUSH_H_ #define _TQ_PUSH_H_
#include "executor.h"
#include "thash.h" #include "thash.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "vnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -39,11 +41,12 @@ typedef struct {
} STqClientPusher; } STqClientPusher;
typedef struct { typedef struct {
int8_t type; int8_t type;
int8_t nodeType; int8_t nodeType;
int8_t reserved[6]; int8_t reserved[6];
int64_t streamId; int64_t streamId;
SEpSet epSet; qTaskInfo_t task;
// TODO sync function
} STqStreamPusher; } STqStreamPusher;
typedef struct { typedef struct {

View File

@ -67,6 +67,26 @@ void tqClose(STQ* pTq) {
} }
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
while (pIter != NULL) {
STqPusher* pusher = *(STqPusher**)pIter;
if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
// repack
STqStreamToken* token = malloc(sizeof(STqStreamToken));
if (token == NULL) {
taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
token->type = TQ_STREAM_TOKEN__DATA;
token->data = msg;
// set input
// exec
}
// send msg to ep
}
// iterate hash // iterate hash
// process all msg // process all msg
// if waiting // if waiting

View File

@ -73,7 +73,7 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet
streamPusher->type = TQ_PUSHER_TYPE__STREAM; streamPusher->type = TQ_PUSHER_TYPE__STREAM;
streamPusher->nodeType = 0; streamPusher->nodeType = 0;
streamPusher->streamId = streamId; streamPusher->streamId = streamId;
memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet)); /*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/
if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) { if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "vnode.h" #include "vnode.h"
@ -37,6 +36,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
pMsg->length = htonl(pMsg->length); pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) { while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;

View File

@ -34,9 +34,6 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
int code; int code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (ver == pWal->vers.lastVer) {
return 0;
}
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;

View File

@ -124,13 +124,8 @@ class WalRetentionEnv : public ::testing::Test {
void SetUp() override { void SetUp() override {
SWalCfg cfg; SWalCfg cfg;
cfg.rollPeriod = -1, cfg.rollPeriod = -1, cfg.segSize = -1, cfg.retentionPeriod = -1, cfg.retentionSize = 0, cfg.rollPeriod = 0,
cfg.segSize = -1, cfg.vgId = 0, cfg.level = TAOS_WAL_FSYNC;
cfg.retentionPeriod = -1,
cfg.retentionSize = 0,
cfg.rollPeriod = 0,
cfg.vgId = 0,
cfg.level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, &cfg); pWal = walOpen(pathName, &cfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
} }
@ -241,6 +236,12 @@ TEST_F(WalCleanEnv, rollback) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
code = walRollback(pWal, 12);
ASSERT_NE(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 9);
code = walRollback(pWal, 9);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 8);
code = walRollback(pWal, 5); code = walRollback(pWal, 5);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 4); ASSERT_EQ(pWal->vers.lastVer, 4);
@ -324,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
TEST_F(WalRetentionEnv, repairMeta1) { TEST_F(WalRetentionEnv, repairMeta1) {
walResetEnv(); walResetEnv();
int code; int code;
int i; int i;
for (i = 0; i < 100; i++) { for (i = 0; i < 100; i++) {
char newStr[100]; char newStr[100];
@ -336,14 +337,14 @@ TEST_F(WalRetentionEnv, repairMeta1) {
TearDown(); TearDown();
//getchar(); // getchar();
char buf[100]; char buf[100];
sprintf(buf, "%s/meta-ver%d", pathName, 0); sprintf(buf, "%s/meta-ver%d", pathName, 0);
taosRemoveFile(buf); taosRemoveFile(buf);
sprintf(buf, "%s/meta-ver%d", pathName, 1); sprintf(buf, "%s/meta-ver%d", pathName, 1);
taosRemoveFile(buf); taosRemoveFile(buf);
SetUp(); SetUp();
//getchar(); // getchar();
ASSERT_EQ(pWal->vers.lastVer, 99); ASSERT_EQ(pWal->vers.lastVer, 99);
@ -401,5 +402,4 @@ TEST_F(WalRetentionEnv, repairMeta1) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
} }
} }