Merge remote-tracking branch 'origin/develop' into feature/sim
This commit is contained in:
commit
150632a1b3
|
@ -5,13 +5,13 @@
|
||||||
"port": 6030,
|
"port": 6030,
|
||||||
"user": "root",
|
"user": "root",
|
||||||
"password": "taosdata",
|
"password": "taosdata",
|
||||||
"databases": "db01",
|
"databases": "db",
|
||||||
"super_table_query":
|
"specified_table_query":
|
||||||
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes",
|
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes",
|
||||||
"sqls": [{"sql": "select avg(c1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}]
|
"sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}]
|
||||||
},
|
},
|
||||||
"sub_table_query":
|
"super_table_query":
|
||||||
{"stblname": "stb01", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes",
|
{"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes",
|
||||||
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}]
|
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4269,23 +4269,24 @@ void *subSubscribeProcess(void *sarg) {
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// start loop to consume result
|
// start loop to consume result
|
||||||
|
TAOS_RES* res = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
||||||
if (1 == g_queryInfo.subQueryInfo.subscribeMode) {
|
if (1 == g_queryInfo.subQueryInfo.subscribeMode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]);
|
res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]);
|
||||||
if (res) {
|
if (res) {
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
||||||
sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID);
|
sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID);
|
||||||
}
|
}
|
||||||
getResult(res, tmpFile);
|
getResult(res, tmpFile);
|
||||||
taos_free_result(res);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
||||||
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], g_queryInfo.subQueryInfo.subscribeKeepProgress);
|
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], g_queryInfo.subQueryInfo.subscribeKeepProgress);
|
||||||
|
@ -4328,23 +4329,24 @@ void *superSubscribeProcess(void *sarg) {
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// start loop to consume result
|
// start loop to consume result
|
||||||
|
TAOS_RES* res = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||||
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
|
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
|
res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
|
||||||
if (res) {
|
if (res) {
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
||||||
sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
||||||
}
|
}
|
||||||
getResult(res, tmpFile);
|
getResult(res, tmpFile);
|
||||||
taos_free_result(res);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||||
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||||
|
|
|
@ -259,7 +259,7 @@ typedef struct {
|
||||||
int16_t bytes[TSDB_MAX_COLUMNS];
|
int16_t bytes[TSDB_MAX_COLUMNS];
|
||||||
int32_t numOfReads;
|
int32_t numOfReads;
|
||||||
int8_t maxReplica;
|
int8_t maxReplica;
|
||||||
int8_t reserved0[0];
|
int8_t reserved0[1];
|
||||||
uint16_t payloadLen;
|
uint16_t payloadLen;
|
||||||
char payload[];
|
char payload[];
|
||||||
} SShowObj;
|
} SShowObj;
|
||||||
|
|
|
@ -60,7 +60,7 @@ typedef struct SSdbRow {
|
||||||
int32_t (*fpReq)(SMnodeMsg *pMsg);
|
int32_t (*fpReq)(SMnodeMsg *pMsg);
|
||||||
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
|
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
|
||||||
char reserveForSync[24];
|
char reserveForSync[24];
|
||||||
SWalHead pHead[];
|
SWalHead pHead;
|
||||||
} SSdbRow;
|
} SSdbRow;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -274,7 +274,7 @@ static int32_t sdbGetSyncVersion(int32_t vgId, uint64_t *fver, uint64_t *vver) {
|
||||||
|
|
||||||
// failed to forward, need revert insert
|
// failed to forward, need revert insert
|
||||||
static void sdbHandleFailedConfirm(SSdbRow *pRow) {
|
static void sdbHandleFailedConfirm(SSdbRow *pRow) {
|
||||||
SWalHead *pHead = pRow->pHead;
|
SWalHead *pHead = &pRow->pHead;
|
||||||
int32_t action = pHead->msgType % 10;
|
int32_t action = pHead->msgType % 10;
|
||||||
|
|
||||||
sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pRow->pObj,
|
sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pRow->pObj,
|
||||||
|
@ -1012,7 +1012,7 @@ static void sdbFreeQueue() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbWriteToQueue(SSdbRow *pRow, int32_t qtype) {
|
static int32_t sdbWriteToQueue(SSdbRow *pRow, int32_t qtype) {
|
||||||
SWalHead *pHead = pRow->pHead;
|
SWalHead *pHead = &pRow->pHead;
|
||||||
|
|
||||||
if (pHead->len > TSDB_MAX_WAL_SIZE) {
|
if (pHead->len > TSDB_MAX_WAL_SIZE) {
|
||||||
sdbError("vgId:1, wal len:%d exceeds limit, hver:%" PRIu64, pHead->len, pHead->version);
|
sdbError("vgId:1, wal len:%d exceeds limit, hver:%" PRIu64, pHead->len, pHead->version);
|
||||||
|
@ -1051,8 +1051,8 @@ static int32_t sdbWriteFwdToQueue(int32_t vgId, void *wparam, int32_t qtype, voi
|
||||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pRow->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
memcpy(&pRow->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
||||||
pRow->rowData = pRow->pHead->cont;
|
pRow->rowData = pRow->pHead.cont;
|
||||||
|
|
||||||
int32_t code = sdbWriteToQueue(pRow, qtype);
|
int32_t code = sdbWriteToQueue(pRow, qtype);
|
||||||
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) code = 0;
|
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) code = 0;
|
||||||
|
@ -1073,7 +1073,7 @@ static int32_t sdbWriteRowToQueue(SSdbRow *pInputRow, int32_t action) {
|
||||||
memcpy(pRow, pInputRow, sizeof(SSdbRow));
|
memcpy(pRow, pInputRow, sizeof(SSdbRow));
|
||||||
pRow->processedCount = 1;
|
pRow->processedCount = 1;
|
||||||
|
|
||||||
SWalHead *pHead = pRow->pHead;
|
SWalHead *pHead = &pRow->pHead;
|
||||||
pRow->rowData = pHead->cont;
|
pRow->rowData = pHead->cont;
|
||||||
(*pTable->fpEncode)(pRow);
|
(*pTable->fpEncode)(pRow);
|
||||||
|
|
||||||
|
@ -1103,9 +1103,9 @@ static void *sdbWorkerFp(void *pWorker) {
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow);
|
taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow);
|
||||||
sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pRow->pMsg, pRow->pObj,
|
sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pRow->pMsg, pRow->pObj,
|
||||||
pRow->pHead->version);
|
pRow->pHead.version);
|
||||||
|
|
||||||
pRow->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pRow : NULL, pRow->pHead, qtype, NULL);
|
pRow->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pRow : NULL, &pRow->pHead, qtype, NULL);
|
||||||
if (pRow->code > 0) pRow->code = 0;
|
if (pRow->code > 0) pRow->code = 0;
|
||||||
|
|
||||||
sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pRow->pMsg, pRow->code);
|
sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pRow->pMsg, pRow->code);
|
||||||
|
@ -1122,7 +1122,7 @@ static void *sdbWorkerFp(void *pWorker) {
|
||||||
sdbConfirmForward(1, pRow, pRow->code);
|
sdbConfirmForward(1, pRow, pRow->code);
|
||||||
} else {
|
} else {
|
||||||
if (qtype == TAOS_QTYPE_FWD) {
|
if (qtype == TAOS_QTYPE_FWD) {
|
||||||
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code);
|
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code);
|
||||||
}
|
}
|
||||||
sdbFreeFromQueue(pRow);
|
sdbFreeFromQueue(pRow);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue