Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode
This commit is contained in:
commit
10a0cbd82c
|
@ -875,6 +875,7 @@ typedef struct SSubQueryMsg {
|
|||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t taskId;
|
||||
int8_t taskType;
|
||||
uint32_t contentLen;
|
||||
char msg[];
|
||||
} SSubQueryMsg;
|
||||
|
|
|
@ -172,7 +172,6 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
|
||||
|
|
|
@ -48,7 +48,6 @@ typedef struct SOutputData {
|
|||
int8_t compressed;
|
||||
char* pData;
|
||||
bool queryEnd;
|
||||
int32_t scheduleJobNo;
|
||||
int32_t bufStatus;
|
||||
int64_t useconds;
|
||||
int8_t precision;
|
||||
|
|
|
@ -84,6 +84,13 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
|
|||
*/
|
||||
int32_t qKillTask(qTaskInfo_t qinfo);
|
||||
|
||||
/**
|
||||
* kill the ongoing query asynchronously
|
||||
* @param qinfo qhandle
|
||||
* @return
|
||||
*/
|
||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo);
|
||||
|
||||
/**
|
||||
* return whether query is completed or not
|
||||
* @param qinfo
|
||||
|
|
|
@ -28,6 +28,7 @@ OP_ENUM_MACRO(DataBlocksOptScan)
|
|||
OP_ENUM_MACRO(TableSeqScan)
|
||||
OP_ENUM_MACRO(TagScan)
|
||||
OP_ENUM_MACRO(SystemTableScan)
|
||||
OP_ENUM_MACRO(StreamBlockScan)
|
||||
OP_ENUM_MACRO(Aggregate)
|
||||
OP_ENUM_MACRO(Project)
|
||||
// OP_ENUM_MACRO(Groupby)
|
||||
|
|
|
@ -38,6 +38,11 @@ enum {
|
|||
JOB_TASK_STATUS_FREEING,
|
||||
};
|
||||
|
||||
enum {
|
||||
TASK_TYPE_PERSISTENT = 1,
|
||||
TASK_TYPE_TEMP,
|
||||
};
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags; // the number of tags in schema
|
||||
uint8_t precision; // the number of precision
|
||||
|
|
|
@ -362,6 +362,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
|
||||
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
|
||||
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed")
|
||||
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B) //"Task status error")
|
||||
|
||||
// grant
|
||||
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
|
||||
|
|
|
@ -650,7 +650,13 @@ TEST(testCase, agg_query_tables) {
|
|||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "use dbv");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "select count(*) from tu");
|
||||
|
|
|
@ -29,8 +29,6 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_QUERY_CONTINUE:
|
||||
return qWorkerProcessCQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_SCHEDULE_DATA_SINK:
|
||||
return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in query queue", pMsg->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
|
|
|
@ -376,7 +376,7 @@ typedef struct STaskParam {
|
|||
|
||||
typedef struct SExchangeInfo {
|
||||
SArray *pSources;
|
||||
int32_t bytes; // total load bytes from remote
|
||||
uint64_t bytes; // total load bytes from remote
|
||||
tsem_t ready;
|
||||
void *pTransporter;
|
||||
SRetrieveTableRsp *pRsp;
|
||||
|
@ -385,7 +385,7 @@ typedef struct SExchangeInfo {
|
|||
|
||||
typedef struct STableScanInfo {
|
||||
void *pTsdbReadHandle;
|
||||
int32_t numOfBlocks;
|
||||
int32_t numOfBlocks; // extract basic running information.
|
||||
int32_t numOfSkipped;
|
||||
int32_t numOfBlockStatis;
|
||||
int64_t numOfRows;
|
||||
|
@ -415,7 +415,11 @@ typedef struct STagScanInfo {
|
|||
} STagScanInfo;
|
||||
|
||||
typedef struct SStreamBlockScanInfo {
|
||||
|
||||
SSDataBlock *pRes; // result SSDataBlock
|
||||
SColumnInfo *pCols; // the output column info
|
||||
uint64_t numOfRows; // total scanned rows
|
||||
uint64_t numOfExec; // execution times
|
||||
void *readerHandle;// stream block reader handle
|
||||
} SStreamBlockScanInfo;
|
||||
|
||||
typedef struct SOptrBasicInfo {
|
||||
|
@ -423,7 +427,6 @@ typedef struct SOptrBasicInfo {
|
|||
int32_t *rowCellInfoOffset; // offset value for each row result cell info
|
||||
SQLFunctionCtx *pCtx;
|
||||
SSDataBlock *pRes;
|
||||
void *keyBuf;
|
||||
} SOptrBasicInfo;
|
||||
|
||||
typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
|
||||
|
|
|
@ -196,7 +196,6 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
pOutput->bufStatus = updateStatus(pDispatcher);
|
||||
pthread_mutex_lock(&pDispatcher->mutex);
|
||||
pOutput->queryEnd = pDispatcher->queryEnd;
|
||||
pOutput->scheduleJobNo = 0;
|
||||
pOutput->useconds = pDispatcher->useconds;
|
||||
pOutput->precision = pDispatcher->schema.precision;
|
||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||
|
|
|
@ -278,6 +278,19 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||
|
||||
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qDebug("QInfo:0x%"PRIx64" query async killed", pQInfo->qId);
|
||||
setQueryKilled(pQInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
|
||||
|
||||
|
|
|
@ -12,7 +12,8 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include <parser.h>
|
||||
#include "parser.h"
|
||||
#include "tq.h"
|
||||
#include "exception.h"
|
||||
#include "os.h"
|
||||
#include "tglobal.h"
|
||||
|
@ -3576,7 +3577,7 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t sta
|
|||
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
||||
|
||||
int64_t tid = 0;
|
||||
pInfo->keyBuf = realloc(pInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
|
||||
pAggInfo->keyBuf = realloc(pAggInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
|
||||
SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid, pTaskInfo, false, pAggInfo);
|
||||
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
|
@ -5061,6 +5062,42 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
|
|||
#endif
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
|
||||
// NOTE: this operator never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
while (tqNextDataBlock(pInfo->readerHandle)) {
|
||||
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
terrno = pTaskInfo->code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pBlockInfo->rows == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle);
|
||||
if (pInfo->pRes->pDataBlock == NULL) {
|
||||
// TODO add log
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// record the scan action.
|
||||
pInfo->numOfExec++;
|
||||
pInfo->numOfRows += pBlockInfo->rows;
|
||||
|
||||
return (pBlockInfo->rows == 0)? NULL:pInfo->pRes;
|
||||
}
|
||||
|
||||
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SExchangeInfo* pEx = (SExchangeInfo*) param;
|
||||
pEx->pRsp = pMsg->pData;
|
||||
|
@ -5263,7 +5300,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
|||
|
||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
tfree(pInfo);
|
||||
tfree(pOperator);
|
||||
|
@ -5371,8 +5407,26 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
tfree(pInfo);
|
||||
tfree(pOperator);
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->readerHandle = pStreamBlockHandle;
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = OP_StreamBlockScan;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->exec = doStreamBlockScan;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -82,7 +82,9 @@ class FstReadMemory {
|
|||
bool init() {
|
||||
char* buf = (char*)calloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) { return false; }
|
||||
if (nRead <= 0) {
|
||||
return false;
|
||||
}
|
||||
_size = nRead;
|
||||
_s = fstSliceCreate((uint8_t*)buf, _size);
|
||||
_fst = fstCreate(&_s);
|
||||
|
@ -108,7 +110,9 @@ class FstReadMemory {
|
|||
StreamWithState* st = streamBuilderIntoStream(sb);
|
||||
StreamWithStateResult* rt = NULL;
|
||||
|
||||
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { result.push_back((uint64_t)(rt->out.out)); }
|
||||
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||
result.push_back((uint64_t)(rt->out.out));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) {
|
||||
|
@ -184,7 +188,9 @@ void checkFstPerf() {
|
|||
delete fw;
|
||||
|
||||
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
||||
if (m->init()) { printf("success to init fst read"); }
|
||||
if (m->init()) {
|
||||
printf("success to init fst read");
|
||||
}
|
||||
Performance_fstReadRecords(m);
|
||||
delete m;
|
||||
}
|
||||
|
@ -348,7 +354,9 @@ class TFileObj {
|
|||
tfileReaderDestroy(reader_);
|
||||
reader_ = NULL;
|
||||
}
|
||||
if (writer_ == NULL) { InitWriter(); }
|
||||
if (writer_ == NULL) {
|
||||
InitWriter();
|
||||
}
|
||||
return tfileWriterPut(writer_, tv, false);
|
||||
}
|
||||
bool InitWriter() {
|
||||
|
@ -388,8 +396,12 @@ class TFileObj {
|
|||
return tfileReaderSearch(reader_, query, result);
|
||||
}
|
||||
~TFileObj() {
|
||||
if (writer_) { tfileWriterDestroy(writer_); }
|
||||
if (reader_) { tfileReaderDestroy(reader_); }
|
||||
if (writer_) {
|
||||
tfileWriterDestroy(writer_);
|
||||
}
|
||||
if (reader_) {
|
||||
tfileReaderDestroy(reader_);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -912,7 +924,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
|||
}
|
||||
TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {}
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
|
||||
std::thread threads[NUM_OF_THREAD];
|
||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||
|
@ -927,14 +940,24 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
|
|||
|
||||
TEST_F(IndexEnv2, testIndex_restart) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {}
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
index->SearchOneTarget("tag1", "Hello", 10);
|
||||
index->SearchOneTarget("tag2", "Test", 10);
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_restart1) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
index->ReadMultiMillonData("tag1", "coding");
|
||||
index->SearchOneTarget("tag1", "Hello", 10);
|
||||
index->SearchOneTarget("tag2", "Test", 10);
|
||||
}
|
||||
|
||||
TEST_F(IndexEnv2, testIndex_read_performance) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {}
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
index->PutOneTarge("tag1", "Hello", 12);
|
||||
index->PutOneTarge("tag1", "Hello", 15);
|
||||
index->ReadMultiMillonData("tag1", "Hello");
|
||||
|
@ -943,17 +966,84 @@ TEST_F(IndexEnv2, testIndex_read_performance) {
|
|||
}
|
||||
TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||
std::string path = "/tmp/multi_tag";
|
||||
if (index->Init(path) != 0) {}
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t num = 1000 * 10000;
|
||||
index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
|
||||
std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
|
||||
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
|
||||
}
|
||||
TEST_F(IndexEnv2, testLongComVal) {
|
||||
TEST_F(IndexEnv2, testLongComVal1) {
|
||||
std::string path = "/tmp/long_colVal";
|
||||
if (index->Init(path) != 0) {}
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
// gen colVal by randstr
|
||||
std::string randstr = "xxxxxxxxxxxxxxxxx";
|
||||
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
|
||||
}
|
||||
|
||||
TEST_F(IndexEnv2, testLongComVal2) {
|
||||
std::string path = "/tmp/long_colVal";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
// gen colVal by randstr
|
||||
std::string randstr = "abcccc fdadfafdafda";
|
||||
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
|
||||
}
|
||||
TEST_F(IndexEnv2, testLongComVal3) {
|
||||
std::string path = "/tmp/long_colVal";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
// gen colVal by randstr
|
||||
std::string randstr = "Yes, coding and coding and coding";
|
||||
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
|
||||
}
|
||||
TEST_F(IndexEnv2, testLongComVal4) {
|
||||
std::string path = "/tmp/long_colVal";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
// gen colVal by randstr
|
||||
std::string randstr = "111111 bac fdadfa";
|
||||
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_read_performance1) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
index->PutOneTarge("tag1", "Hello", 12);
|
||||
index->PutOneTarge("tag1", "Hello", 15);
|
||||
index->ReadMultiMillonData("tag1", "Hello", 1000);
|
||||
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||
assert(3 == index->SearchOne("tag1", "Hello"));
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_read_performance2) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
index->PutOneTarge("tag1", "Hello", 12);
|
||||
index->PutOneTarge("tag1", "Hello", 15);
|
||||
index->ReadMultiMillonData("tag1", "Hello", 1000 * 10);
|
||||
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||
assert(3 == index->SearchOne("tag1", "Hello"));
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_read_performance3) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
index->PutOneTarge("tag1", "Hello", 12);
|
||||
index->PutOneTarge("tag1", "Hello", 15);
|
||||
index->ReadMultiMillonData("tag1", "Hello", 1000 * 100);
|
||||
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||
assert(3 == index->SearchOne("tag1", "Hello"));
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_read_performance4) {
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
index->PutOneTarge("tag10", "Hello", 12);
|
||||
index->PutOneTarge("tag12", "Hello", 15);
|
||||
index->ReadMultiMillonData("tag10", "Hello", 1000 * 100);
|
||||
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||
assert(3 == index->SearchOne("tag10", "Hello"));
|
||||
}
|
||||
|
|
|
@ -680,10 +680,9 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseConte
|
|||
|
||||
serializeVgroupTablesBatchImpl(&tbatch, pBufArray);
|
||||
destroyCreateTbReqBatch(&tbatch);
|
||||
|
||||
} else { // it is a child table, created according to a super table
|
||||
code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray);
|
||||
if (code != 0) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1656,7 +1656,7 @@ static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, cha
|
|||
}
|
||||
|
||||
// Remove quotation marks
|
||||
if (TK_STRING == type) {
|
||||
if (TSDB_DATA_TYPE_BINARY == type) {
|
||||
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
|
||||
return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
|
||||
}
|
||||
|
|
|
@ -31,8 +31,6 @@ enum {
|
|||
QW_PHASE_POST_QUERY,
|
||||
QW_PHASE_PRE_CQUERY,
|
||||
QW_PHASE_POST_CQUERY,
|
||||
QW_PHASE_PRE_SINK,
|
||||
QW_PHASE_POST_SINK,
|
||||
QW_PHASE_PRE_FETCH,
|
||||
QW_PHASE_POST_FETCH,
|
||||
};
|
||||
|
@ -86,6 +84,7 @@ typedef struct SQWMsg {
|
|||
|
||||
typedef struct SQWPhaseInput {
|
||||
int8_t status;
|
||||
int8_t taskType;
|
||||
int32_t code;
|
||||
qTaskInfo_t taskHandle;
|
||||
DataSinkHandle sinkHandle;
|
||||
|
@ -93,8 +92,7 @@ typedef struct SQWPhaseInput {
|
|||
|
||||
typedef struct SQWPhaseOutput {
|
||||
int32_t rspCode;
|
||||
bool needStop;
|
||||
bool needRsp;
|
||||
bool needStop;
|
||||
} SQWPhaseOutput;
|
||||
|
||||
|
||||
|
@ -105,10 +103,17 @@ typedef struct SQWTaskStatus {
|
|||
|
||||
typedef struct SQWTaskCtx {
|
||||
SRWLatch lock;
|
||||
int32_t phase;
|
||||
int8_t phase;
|
||||
int8_t taskType;
|
||||
|
||||
void *readyConnection;
|
||||
void *dropConnection;
|
||||
void *cancelConnection;
|
||||
|
||||
int32_t sinkId;
|
||||
int32_t readyCode;
|
||||
bool emptyRes;
|
||||
int8_t queryContinue;
|
||||
int8_t queryInQueue;
|
||||
int32_t rspCode;
|
||||
|
||||
int8_t events[QW_EVENT_MAX];
|
||||
|
||||
|
@ -144,7 +149,11 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
|
||||
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
|
||||
|
||||
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
|
||||
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
|
||||
|
||||
#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
||||
|
||||
#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH)
|
||||
|
||||
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
|
||||
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
|
||||
|
@ -166,6 +175,10 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
|
||||
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
|
||||
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
|
||||
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
|
||||
|
||||
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
|||
#include "qworkerInt.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType);
|
||||
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -229,42 +229,6 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
|
||||
if (NULL == req) {
|
||||
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
req->header.vgId = mgmt->nodeId;
|
||||
req->sId = sId;
|
||||
req->queryId = qId;
|
||||
req->taskId = tId;
|
||||
|
||||
SRpcMsg pNewMsg = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
|
||||
.pCont = req,
|
||||
.contLen = sizeof(SSinkDataReq),
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qError("put data sink schedule msg to queue failed, code:%x", code);
|
||||
rpcFreeCont(req);
|
||||
QW_ERR_RET(code);
|
||||
}
|
||||
|
||||
qDebug("put data sink schedule msg to query queue");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
|
||||
|
@ -326,7 +290,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
QW_SCH_TASK_DLOG("processQuery start, node:%p", node);
|
||||
|
||||
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
|
||||
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
|
||||
|
||||
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
|
||||
|
||||
|
@ -366,25 +330,6 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SSinkDataReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
qError("invalid sink data msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
//dsScheduleProcess();
|
||||
//TODO
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
|
@ -429,7 +374,9 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
msg->sId = htobe64(msg->sId);
|
||||
uint64_t sId = msg->sId;
|
||||
|
||||
SSchedulerStatusRsp *sStatus = NULL;
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
|||
ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(
|
||||
qworkerTest
|
||||
PUBLIC os util common transport gtest qcom planner qworker
|
||||
PUBLIC os util common transport gtest qcom planner qworker executor
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
|
|
|
@ -32,11 +32,75 @@
|
|||
#include "qworker.h"
|
||||
#include "stub.h"
|
||||
#include "addr_any.h"
|
||||
#include "executor.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
|
||||
namespace {
|
||||
|
||||
bool testStop = false;
|
||||
bool qwtTestEnableSleep = true;
|
||||
bool qwtTestStop = false;
|
||||
bool qwtTestDeadLoop = true;
|
||||
int32_t qwtTestMTRunSec = 10;
|
||||
int32_t qwtTestPrintNum = 100000;
|
||||
int32_t qwtTestCaseIdx = 0;
|
||||
int32_t qwtTestCaseNum = 4;
|
||||
|
||||
void qwtInitLogFile() {
|
||||
const char *defaultLogFileNamePrefix = "taosdlog";
|
||||
const int32_t maxLogFileNum = 10;
|
||||
|
||||
tsAsyncLog = 0;
|
||||
qDebugFlag = 159;
|
||||
|
||||
char temp[128] = {0};
|
||||
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
|
||||
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
|
||||
printf("failed to open log file in directory:%s\n", tsLogDir);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
|
||||
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
|
||||
queryMsg->queryId = htobe64(1);
|
||||
queryMsg->sId = htobe64(1);
|
||||
queryMsg->taskId = htobe64(1);
|
||||
queryMsg->contentLen = htonl(100);
|
||||
queryRpc->pCont = queryMsg;
|
||||
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
|
||||
}
|
||||
|
||||
void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
|
||||
readyMsg->sId = htobe64(1);
|
||||
readyMsg->queryId = htobe64(1);
|
||||
readyMsg->taskId = htobe64(1);
|
||||
readyRpc->pCont = readyMsg;
|
||||
readyRpc->contLen = sizeof(SResReadyReq);
|
||||
}
|
||||
|
||||
void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
|
||||
fetchMsg->sId = htobe64(1);
|
||||
fetchMsg->queryId = htobe64(1);
|
||||
fetchMsg->taskId = htobe64(1);
|
||||
fetchRpc->pCont = fetchMsg;
|
||||
fetchRpc->contLen = sizeof(SResFetchReq);
|
||||
}
|
||||
|
||||
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
|
||||
dropMsg->sId = htobe64(1);
|
||||
dropMsg->queryId = htobe64(1);
|
||||
dropMsg->taskId = htobe64(1);
|
||||
dropRpc->pCont = dropMsg;
|
||||
dropRpc->contLen = sizeof(STaskDropReq);
|
||||
}
|
||||
|
||||
void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
|
||||
statusMsg->sId = htobe64(1);
|
||||
statusRpc->pCont = statusMsg;
|
||||
statusRpc->contLen = sizeof(SSchTasksStatusReq);
|
||||
statusRpc->msgType = TDMT_VND_TASKS_STATUS;
|
||||
}
|
||||
|
||||
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
|
||||
return 0;
|
||||
|
@ -48,6 +112,7 @@ int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
|
|||
|
||||
|
||||
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||
/*
|
||||
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
|
||||
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
|
||||
printf("task num:%d\n", rsp->num);
|
||||
|
@ -56,9 +121,63 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
|||
printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status);
|
||||
}
|
||||
}
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||
int32_t idx = qwtTestCaseIdx % qwtTestCaseNum;
|
||||
|
||||
if (0 == idx) {
|
||||
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
|
||||
*handle = (DataSinkHandle)qwtTestCaseIdx+1;
|
||||
} else if (1 == idx) {
|
||||
*pTaskInfo = NULL;
|
||||
*handle = NULL;
|
||||
} else if (2 == idx) {
|
||||
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
|
||||
*handle = NULL;
|
||||
} else if (3 == idx) {
|
||||
*pTaskInfo = NULL;
|
||||
*handle = (DataSinkHandle)qwtTestCaseIdx;
|
||||
}
|
||||
|
||||
++qwtTestCaseIdx;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qwtKillTask(qTaskInfo_t qinfo) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void qwtDestroyTask(qTaskInfo_t qHandle) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
|
||||
}
|
||||
|
||||
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
|
||||
}
|
||||
|
||||
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void qwtDestroyDataSinker(DataSinkHandle handle) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
void stubSetStringToPlan() {
|
||||
|
@ -74,11 +193,118 @@ void stubSetStringToPlan() {
|
|||
}
|
||||
}
|
||||
|
||||
void stubSetExecTask() {
|
||||
static Stub stub;
|
||||
stub.set(qExecTask, qwtExecTask);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^qExecTask$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtExecTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
void stubSetCreateExecTask() {
|
||||
static Stub stub;
|
||||
stub.set(qCreateExecTask, qwtCreateExecTask);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^qCreateExecTask$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtCreateExecTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stubSetAsyncKillTask() {
|
||||
static Stub stub;
|
||||
stub.set(qAsyncKillTask, qwtKillTask);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^qAsyncKillTask$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtKillTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stubSetDestroyTask() {
|
||||
static Stub stub;
|
||||
stub.set(qDestroyTask, qwtDestroyTask);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^qDestroyTask$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtDestroyTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void stubSetDestroyDataSinker() {
|
||||
static Stub stub;
|
||||
stub.set(dsDestroyDataSinker, qwtDestroyDataSinker);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^dsDestroyDataSinker$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtDestroyDataSinker);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stubSetGetDataLength() {
|
||||
static Stub stub;
|
||||
stub.set(dsGetDataLength, qwtGetDataLength);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^dsGetDataLength$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtGetDataLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stubSetEndPut() {
|
||||
static Stub stub;
|
||||
stub.set(dsEndPut, qwtEndPut);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^dsEndPut$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtEndPut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stubSetPutDataBlock() {
|
||||
static Stub stub;
|
||||
stub.set(dsPutDataBlock, qwtPutDataBlock);
|
||||
{
|
||||
AddrAny any("libexecutor.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^dsPutDataBlock$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtPutDataBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stubSetRpcSendResponse() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendResponse, qwtRpcSendResponse);
|
||||
{
|
||||
AddrAny any("libplanner.so");
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
|
||||
for (const auto& f : result) {
|
||||
|
@ -87,24 +313,35 @@ void stubSetRpcSendResponse() {
|
|||
}
|
||||
}
|
||||
|
||||
void stubSetGetDataBlock() {
|
||||
static Stub stub;
|
||||
stub.set(dsGetDataBlock, qwtGetDataBlock);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string,void*> result;
|
||||
any.get_global_func_addr_dynsym("^dsGetDataBlock$", result);
|
||||
for (const auto& f : result) {
|
||||
stub.set(f.second, qwtGetDataBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void *queryThread(void *param) {
|
||||
SRpcMsg queryRpc = {0};
|
||||
int32_t code = 0;
|
||||
uint32_t n = 0;
|
||||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
|
||||
queryMsg->queryId = htobe64(1);
|
||||
queryMsg->sId = htobe64(1);
|
||||
queryMsg->taskId = htobe64(1);
|
||||
queryMsg->contentLen = htonl(100);
|
||||
queryRpc.pCont = queryMsg;
|
||||
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
|
||||
|
||||
while (!testStop) {
|
||||
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||
usleep(rand()%5);
|
||||
if (++n % 50000 == 0) {
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildQueryReqMsg(&queryRpc);
|
||||
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||
free(queryRpc.pCont);
|
||||
if (qwtTestEnableSleep) {
|
||||
usleep(rand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("query:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
@ -119,16 +356,14 @@ void *readyThread(void *param) {
|
|||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
SResReadyReq readyMsg = {0};
|
||||
readyMsg.sId = htobe64(1);
|
||||
readyMsg.queryId = htobe64(1);
|
||||
readyMsg.taskId = htobe64(1);
|
||||
readyRpc.pCont = &readyMsg;
|
||||
readyRpc.contLen = sizeof(SResReadyReq);
|
||||
|
||||
while (!testStop) {
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
|
||||
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
|
||||
usleep(rand()%5);
|
||||
if (++n % 50000 == 0) {
|
||||
if (qwtTestEnableSleep) {
|
||||
usleep(rand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("ready:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
@ -143,16 +378,14 @@ void *fetchThread(void *param) {
|
|||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
SResFetchReq fetchMsg = {0};
|
||||
fetchMsg.sId = htobe64(1);
|
||||
fetchMsg.queryId = htobe64(1);
|
||||
fetchMsg.taskId = htobe64(1);
|
||||
fetchRpc.pCont = &fetchMsg;
|
||||
fetchRpc.contLen = sizeof(SResFetchReq);
|
||||
|
||||
while (!testStop) {
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
|
||||
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
|
||||
usleep(rand()%5);
|
||||
if (++n % 50000 == 0) {
|
||||
if (qwtTestEnableSleep) {
|
||||
usleep(rand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("fetch:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
@ -167,16 +400,14 @@ void *dropThread(void *param) {
|
|||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
STaskDropReq dropMsg = {0};
|
||||
dropMsg.sId = htobe64(1);
|
||||
dropMsg.queryId = htobe64(1);
|
||||
dropMsg.taskId = htobe64(1);
|
||||
dropRpc.pCont = &dropMsg;
|
||||
dropRpc.contLen = sizeof(STaskDropReq);
|
||||
|
||||
while (!testStop) {
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
|
||||
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
|
||||
usleep(rand()%5);
|
||||
if (++n % 50000 == 0) {
|
||||
if (qwtTestEnableSleep) {
|
||||
usleep(rand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("drop:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
@ -191,16 +422,14 @@ void *statusThread(void *param) {
|
|||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
SSchTasksStatusReq statusMsg = {0};
|
||||
statusMsg.sId = htobe64(1);
|
||||
statusRpc.pCont = &statusMsg;
|
||||
statusRpc.contLen = sizeof(SSchTasksStatusReq);
|
||||
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
|
||||
|
||||
while (!testStop) {
|
||||
statusMsg.sId = htobe64(1);
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
usleep(rand()%5);
|
||||
if (++n % 50000 == 0) {
|
||||
if (qwtTestEnableSleep) {
|
||||
usleep(rand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("status:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
@ -209,6 +438,35 @@ void *statusThread(void *param) {
|
|||
}
|
||||
|
||||
|
||||
void *controlThread(void *param) {
|
||||
SRpcMsg queryRpc = {0};
|
||||
int32_t code = 0;
|
||||
uint32_t n = 0;
|
||||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildQueryReqMsg(&queryRpc);
|
||||
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||
free(queryRpc.pCont);
|
||||
if (qwtTestEnableSleep) {
|
||||
usleep(rand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("query:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *queryQueueThread(void *param) {
|
||||
|
||||
}
|
||||
|
||||
void *fetchQueueThread(void *param) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -224,6 +482,8 @@ TEST(seqTest, normalCase) {
|
|||
SRpcMsg fetchRpc = {0};
|
||||
SRpcMsg dropRpc = {0};
|
||||
SRpcMsg statusRpc = {0};
|
||||
|
||||
qwtInitLogFile();
|
||||
|
||||
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
|
||||
queryMsg->queryId = htobe64(1);
|
||||
|
@ -262,6 +522,15 @@ TEST(seqTest, normalCase) {
|
|||
|
||||
stubSetStringToPlan();
|
||||
stubSetRpcSendResponse();
|
||||
stubSetExecTask();
|
||||
stubSetCreateExecTask();
|
||||
stubSetAsyncKillTask();
|
||||
stubSetDestroyTask();
|
||||
stubSetDestroyDataSinker();
|
||||
stubSetGetDataLength();
|
||||
stubSetEndPut();
|
||||
stubSetPutDataBlock();
|
||||
stubSetGetDataBlock();
|
||||
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
@ -308,6 +577,8 @@ TEST(seqTest, cancelFirst) {
|
|||
SRpcMsg queryRpc = {0};
|
||||
SRpcMsg dropRpc = {0};
|
||||
SRpcMsg statusRpc = {0};
|
||||
|
||||
qwtInitLogFile();
|
||||
|
||||
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
|
||||
queryMsg->queryId = htobe64(1);
|
||||
|
@ -348,7 +619,7 @@ TEST(seqTest, cancelFirst) {
|
|||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(code, TSDB_CODE_QRY_TASK_DROPPED);
|
||||
|
||||
statusMsg.sId = htobe64(1);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
|
@ -366,44 +637,16 @@ TEST(seqTest, randCase) {
|
|||
SRpcMsg fetchRpc = {0};
|
||||
SRpcMsg dropRpc = {0};
|
||||
SRpcMsg statusRpc = {0};
|
||||
|
||||
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
|
||||
queryMsg->queryId = htobe64(1);
|
||||
queryMsg->sId = htobe64(1);
|
||||
queryMsg->taskId = htobe64(1);
|
||||
queryMsg->contentLen = htonl(100);
|
||||
queryRpc.pCont = queryMsg;
|
||||
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
|
||||
|
||||
SResReadyReq readyMsg = {0};
|
||||
readyMsg.sId = htobe64(1);
|
||||
readyMsg.queryId = htobe64(1);
|
||||
readyMsg.taskId = htobe64(1);
|
||||
readyRpc.pCont = &readyMsg;
|
||||
readyRpc.contLen = sizeof(SResReadyReq);
|
||||
|
||||
SResFetchReq fetchMsg = {0};
|
||||
fetchMsg.sId = htobe64(1);
|
||||
fetchMsg.queryId = htobe64(1);
|
||||
fetchMsg.taskId = htobe64(1);
|
||||
fetchRpc.pCont = &fetchMsg;
|
||||
fetchRpc.contLen = sizeof(SResFetchReq);
|
||||
|
||||
STaskDropReq dropMsg = {0};
|
||||
dropMsg.sId = htobe64(1);
|
||||
dropMsg.queryId = htobe64(1);
|
||||
dropMsg.taskId = htobe64(1);
|
||||
dropRpc.pCont = &dropMsg;
|
||||
dropRpc.contLen = sizeof(STaskDropReq);
|
||||
|
||||
SSchTasksStatusReq statusMsg = {0};
|
||||
statusMsg.sId = htobe64(1);
|
||||
statusRpc.pCont = &statusMsg;
|
||||
statusRpc.contLen = sizeof(SSchTasksStatusReq);
|
||||
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
|
||||
|
||||
qwtInitLogFile();
|
||||
|
||||
stubSetStringToPlan();
|
||||
stubSetRpcSendResponse();
|
||||
stubSetCreateExecTask();
|
||||
|
||||
srand(time(NULL));
|
||||
|
||||
|
@ -416,20 +659,25 @@ TEST(seqTest, randCase) {
|
|||
int32_t r = rand() % maxr;
|
||||
|
||||
if (r >= 0 && r < maxr/5) {
|
||||
printf("Query,%d\n", t++);
|
||||
printf("Query,%d\n", t++);
|
||||
qwtBuildQueryReqMsg(&queryRpc);
|
||||
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||
free(queryRpc.pCont);
|
||||
} else if (r >= maxr/5 && r < maxr * 2/5) {
|
||||
printf("Ready,%d\n", t++);
|
||||
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
|
||||
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
|
||||
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
|
||||
printf("Fetch,%d\n", t++);
|
||||
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
|
||||
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
|
||||
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
|
||||
printf("Drop,%d\n", t++);
|
||||
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
|
||||
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
|
||||
} else if (r >= maxr * 4/5 && r < maxr-1) {
|
||||
printf("Status,%d\n", t++);
|
||||
statusMsg.sId = htobe64(1);
|
||||
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
} else {
|
||||
|
@ -445,6 +693,8 @@ TEST(seqTest, multithreadRand) {
|
|||
void *mgmt = NULL;
|
||||
int32_t code = 0;
|
||||
void *mockPointer = (void *)0x1;
|
||||
|
||||
qwtInitLogFile();
|
||||
|
||||
stubSetStringToPlan();
|
||||
stubSetRpcSendResponse();
|
||||
|
@ -464,15 +714,69 @@ TEST(seqTest, multithreadRand) {
|
|||
pthread_create(&(t4), &thattr, dropThread, NULL);
|
||||
pthread_create(&(t5), &thattr, statusThread, NULL);
|
||||
|
||||
int32_t t = 0;
|
||||
int32_t maxr = 10001;
|
||||
sleep(300);
|
||||
testStop = true;
|
||||
sleep(1);
|
||||
while (true) {
|
||||
if (qwtTestDeadLoop) {
|
||||
sleep(1);
|
||||
} else {
|
||||
sleep(qwtTestMTRunSec);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
qwtTestStop = true;
|
||||
sleep(3);
|
||||
|
||||
qWorkerDestroy(&mgmt);
|
||||
}
|
||||
|
||||
TEST(rcTest, multithread) {
|
||||
void *mgmt = NULL;
|
||||
int32_t code = 0;
|
||||
void *mockPointer = (void *)0x1;
|
||||
|
||||
qwtInitLogFile();
|
||||
|
||||
stubSetStringToPlan();
|
||||
stubSetRpcSendResponse();
|
||||
stubSetExecTask();
|
||||
stubSetCreateExecTask();
|
||||
stubSetAsyncKillTask();
|
||||
stubSetDestroyTask();
|
||||
stubSetDestroyDataSinker();
|
||||
stubSetGetDataLength();
|
||||
stubSetEndPut();
|
||||
stubSetPutDataBlock();
|
||||
stubSetGetDataBlock();
|
||||
|
||||
srand(time(NULL));
|
||||
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
|
||||
pthread_t t1,t2,t3,t4,t5;
|
||||
pthread_create(&(t1), &thattr, controlThread, mgmt);
|
||||
pthread_create(&(t2), &thattr, queryQueueThread, NULL);
|
||||
pthread_create(&(t3), &thattr, fetchQueueThread, NULL);
|
||||
|
||||
while (true) {
|
||||
if (qwtTestDeadLoop) {
|
||||
sleep(1);
|
||||
} else {
|
||||
sleep(qwtTestMTRunSec);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
qwtTestStop = true;
|
||||
sleep(3);
|
||||
|
||||
qWorkerDestroy(&mgmt);
|
||||
}
|
||||
|
||||
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
|
|
|
@ -1085,6 +1085,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(pJob->queryId);
|
||||
pMsg->taskId = htobe64(pTask->taskId);
|
||||
pMsg->taskType = TASK_TYPE_TEMP;
|
||||
pMsg->contentLen = htonl(pTask->msgLen);
|
||||
memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
|
||||
break;
|
||||
|
@ -1471,6 +1472,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag *pDag, SArray **pTasks) {
|
|||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(plan->id.queryId);
|
||||
pMsg->taskId = htobe64(schGenUUID());
|
||||
pMsg->taskType = TASK_TYPE_PERSISTENT;
|
||||
pMsg->contentLen = htonl(msgLen);
|
||||
memcpy(pMsg->msg, msg, msgLen);
|
||||
|
||||
|
|
|
@ -102,38 +102,110 @@ typedef void* queue[2];
|
|||
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
||||
|
||||
typedef struct {
|
||||
SRpcInfo* pRpc; // associated SRpcInfo
|
||||
SEpSet epSet; // ip list provided by app
|
||||
void* ahandle; // handle provided by app
|
||||
struct SRpcConn* pConn; // pConn allocated
|
||||
tmsg_t msgType; // message type
|
||||
uint8_t* pCont; // content provided by app
|
||||
int32_t contLen; // content length
|
||||
int32_t code; // error code
|
||||
int16_t numOfTry; // number of try for different servers
|
||||
int8_t oldInUse; // server EP inUse passed by app
|
||||
int8_t redirect; // flag to indicate redirect
|
||||
int8_t connType; // connection type
|
||||
int64_t rid; // refId returned by taosAddRef
|
||||
SRpcMsg* pRsp; // for synchronous API
|
||||
tsem_t* pSem; // for synchronous API
|
||||
SEpSet* pSet; // for synchronous API
|
||||
char msg[0]; // RpcHead starts from here
|
||||
SRpcInfo* pRpc; // associated SRpcInfo
|
||||
SEpSet epSet; // ip list provided by app
|
||||
void* ahandle; // handle provided by app
|
||||
// struct SRpcConn* pConn; // pConn allocated
|
||||
tmsg_t msgType; // message type
|
||||
uint8_t* pCont; // content provided by app
|
||||
int32_t contLen; // content length
|
||||
// int32_t code; // error code
|
||||
// int16_t numOfTry; // number of try for different servers
|
||||
// int8_t oldInUse; // server EP inUse passed by app
|
||||
// int8_t redirect; // flag to indicate redirect
|
||||
int8_t connType; // connection type
|
||||
int64_t rid; // refId returned by taosAddRef
|
||||
SRpcMsg* pRsp; // for synchronous API
|
||||
tsem_t* pSem; // for synchronous API
|
||||
char* ip;
|
||||
uint32_t port;
|
||||
// SEpSet* pSet; // for synchronous API
|
||||
} SRpcReqContext;
|
||||
|
||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
|
||||
typedef struct {
|
||||
SRpcInfo* pRpc; // associated SRpcInfo
|
||||
SEpSet epSet; // ip list provided by app
|
||||
void* ahandle; // handle provided by app
|
||||
// struct SRpcConn* pConn; // pConn allocated
|
||||
tmsg_t msgType; // message type
|
||||
uint8_t* pCont; // content provided by app
|
||||
int32_t contLen; // content length
|
||||
// int32_t code; // error code
|
||||
// int16_t numOfTry; // number of try for different servers
|
||||
// int8_t oldInUse; // server EP inUse passed by app
|
||||
// int8_t redirect; // flag to indicate redirect
|
||||
int8_t connType; // connection type
|
||||
int64_t rid; // refId returned by taosAddRef
|
||||
SRpcMsg* pRsp; // for synchronous API
|
||||
tsem_t* pSem; // for synchronous API
|
||||
char* ip;
|
||||
uint32_t port;
|
||||
// SEpSet* pSet; // for synchronous API
|
||||
} STransConnCtx;
|
||||
|
||||
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
|
||||
#pragma pack(push, 1)
|
||||
|
||||
typedef struct {
|
||||
char version : 4; // RPC version
|
||||
char comp : 4; // compression algorithm, 0:no compression 1:lz4
|
||||
char resflag : 2; // reserved bits
|
||||
char spi : 3; // security parameter index
|
||||
char encrypt : 3; // encrypt algorithm, 0: no encryption
|
||||
|
||||
uint32_t code; // del later
|
||||
uint32_t msgType;
|
||||
int32_t msgLen;
|
||||
uint8_t content[0]; // message body starts from here
|
||||
} STransMsgHead;
|
||||
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
int32_t contLen;
|
||||
} STransCompMsg;
|
||||
|
||||
typedef struct {
|
||||
uint32_t timeStamp;
|
||||
uint8_t auth[TSDB_AUTH_LEN];
|
||||
} STransDigestMsg;
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||
|
||||
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
|
||||
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
|
||||
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
|
||||
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
|
||||
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
|
||||
#define rpcIsReq(type) (type & 1U)
|
||||
|
||||
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||
|
||||
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead) + sizeof(STransDigestMsg))
|
||||
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
||||
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
|
||||
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
||||
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
||||
#define transIsReq(type) (type & 1U)
|
||||
|
||||
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
|
||||
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);
|
||||
|
||||
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
|
||||
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
|
||||
|
||||
void transConnCtxDestroy(STransConnCtx* ctx);
|
||||
|
||||
typedef struct SConnBuffer {
|
||||
char* buf;
|
||||
int len;
|
||||
int cap;
|
||||
int left;
|
||||
} SConnBuffer;
|
||||
|
||||
#endif
|
||||
|
|
|
@ -45,6 +45,9 @@ extern "C" {
|
|||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||
|
||||
void taosCloseServer(void* arg);
|
||||
void taosCloseClient(void* arg);
|
||||
|
||||
typedef struct {
|
||||
int sessions; // number of sessions allowed
|
||||
int numOfThreads; // number of threads to process incoming messages
|
||||
|
|
|
@ -17,15 +17,9 @@
|
|||
|
||||
#include "transComm.h"
|
||||
|
||||
typedef struct SConnBuffer {
|
||||
char* buf;
|
||||
int len;
|
||||
int cap;
|
||||
int left;
|
||||
} SConnBuffer;
|
||||
|
||||
void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
|
||||
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
|
||||
taosInitServer, taosInitClient};
|
||||
void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient};
|
||||
|
||||
void* rpcOpen(const SRpcInit* pInit) {
|
||||
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
||||
|
@ -38,13 +32,18 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->cfp = pInit->cfp;
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
pRpc->connType = pInit->connType;
|
||||
pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
|
||||
return pRpc;
|
||||
}
|
||||
void rpcClose(void* arg) { return; }
|
||||
void rpcClose(void* arg) {
|
||||
SRpcInfo* pRpc = (SRpcInfo*)arg;
|
||||
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
|
||||
free(pRpc);
|
||||
return;
|
||||
}
|
||||
void* rpcMallocCont(int contLen) {
|
||||
int size = contLen + RPC_MSG_OVERHEAD;
|
||||
int size = contLen + TRANS_MSG_OVERHEAD;
|
||||
|
||||
char* start = (char*)calloc(1, (size_t)size);
|
||||
if (start == NULL) {
|
||||
|
@ -53,7 +52,7 @@ void* rpcMallocCont(int contLen) {
|
|||
} else {
|
||||
tTrace("malloc mem:%p size:%d", start, size);
|
||||
}
|
||||
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
||||
return start + sizeof(STransMsgHead);
|
||||
}
|
||||
void rpcFreeCont(void* cont) { return; }
|
||||
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
||||
|
@ -71,6 +70,7 @@ int32_t rpcInit(void) {
|
|||
|
||||
void rpcCleanup(void) {
|
||||
// impl later
|
||||
//
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -21,25 +21,32 @@ typedef struct SCliConn {
|
|||
uv_connect_t connReq;
|
||||
uv_stream_t* stream;
|
||||
uv_write_t* writeReq;
|
||||
SConnBuffer readBuf;
|
||||
void* data;
|
||||
queue conn;
|
||||
char spi;
|
||||
char secured;
|
||||
uint64_t expireTime;
|
||||
} SCliConn;
|
||||
|
||||
typedef struct SCliMsg {
|
||||
SRpcReqContext* context;
|
||||
queue q;
|
||||
uint64_t st;
|
||||
STransConnCtx* ctx;
|
||||
SRpcMsg msg;
|
||||
queue q;
|
||||
uint64_t st;
|
||||
} SCliMsg;
|
||||
|
||||
typedef struct SCliThrdObj {
|
||||
pthread_t thread;
|
||||
uv_loop_t* loop;
|
||||
uv_async_t* cliAsync; //
|
||||
void* cache; // conn pool
|
||||
uv_timer_t* pTimer;
|
||||
void* cache; // conn pool
|
||||
queue msg;
|
||||
pthread_mutex_t msgMtx;
|
||||
void* shandle;
|
||||
uint64_t nextTimeout; // next timeout
|
||||
void* shandle; //
|
||||
|
||||
} SCliThrdObj;
|
||||
|
||||
typedef struct SClientObj {
|
||||
|
@ -49,31 +56,188 @@ typedef struct SClientObj {
|
|||
SCliThrdObj** pThreadObj;
|
||||
} SClientObj;
|
||||
|
||||
typedef struct SConnList {
|
||||
queue conn;
|
||||
} SConnList;
|
||||
|
||||
// conn pool
|
||||
// add expire timeout and capacity limit
|
||||
static void* connCacheCreate(int size);
|
||||
static void* connCacheDestroy(void* cache);
|
||||
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
|
||||
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
|
||||
|
||||
static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||
// register timer in each thread to clear expire conn
|
||||
static void clientTimeoutCb(uv_timer_t* handle);
|
||||
// process data read from server, auth/decompress etc
|
||||
static void clientProcessData(SCliConn* conn);
|
||||
// check whether already read complete packet from server
|
||||
static bool clientReadComplete(SConnBuffer* pBuf);
|
||||
// alloc buf for read
|
||||
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||
// callback after read nbytes from socket
|
||||
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||
// callback after write data to socket
|
||||
static void clientWriteCb(uv_write_t* req, int status);
|
||||
// callback after conn to server
|
||||
static void clientConnCb(uv_connect_t* req, int status);
|
||||
static void clientAsyncCb(uv_async_t* handle);
|
||||
static void clientDestroy(uv_handle_t* handle);
|
||||
static void clientConnDestroy(SCliConn* pConn);
|
||||
|
||||
static void clientMsgDestroy(SCliMsg* pMsg);
|
||||
|
||||
static void* clientThread(void* arg);
|
||||
|
||||
static void clientProcessData(SCliConn* conn) {
|
||||
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
|
||||
SRpcInfo* pRpc = pCtx->ahandle;
|
||||
SRpcMsg rpcMsg;
|
||||
|
||||
rpcMsg.pCont = conn->readBuf.buf;
|
||||
rpcMsg.contLen = conn->readBuf.len;
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||
// impl
|
||||
}
|
||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
|
||||
static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
static void clientTimeoutCb(uv_timer_t* handle) {
|
||||
SCliThrdObj* pThrd = handle->data;
|
||||
SRpcInfo* pRpc = pThrd->shandle;
|
||||
int64_t currentTime = pThrd->nextTimeout;
|
||||
|
||||
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL);
|
||||
while (p != NULL) {
|
||||
while (!QUEUE_IS_EMPTY(&p->conn)) {
|
||||
queue* h = QUEUE_HEAD(&p->conn);
|
||||
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
||||
if (c->expireTime < currentTime) {
|
||||
QUEUE_REMOVE(h);
|
||||
clientConnDestroy(c);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
p = taosHashIterate((SHashObj*)pThrd->cache, p);
|
||||
}
|
||||
|
||||
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
|
||||
uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0);
|
||||
}
|
||||
static void* connCacheCreate(int size) {
|
||||
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
return false;
|
||||
}
|
||||
static void* connCacheDestroy(void* cache) {
|
||||
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL);
|
||||
while (connList != NULL) {
|
||||
while (!QUEUE_IS_EMPTY(&connList->conn)) {
|
||||
queue* h = QUEUE_HEAD(&connList->conn);
|
||||
QUEUE_REMOVE(h);
|
||||
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
||||
clientConnDestroy(c);
|
||||
}
|
||||
connList = taosHashIterate((SHashObj*)cache, connList);
|
||||
}
|
||||
taosHashClear(cache);
|
||||
}
|
||||
|
||||
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
|
||||
char key[128] = {0};
|
||||
tstrncpy(key, ip, strlen(ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||
|
||||
SHashObj* pCache = cache;
|
||||
SConnList* plist = taosHashGet(pCache, key, strlen(key));
|
||||
if (plist == NULL) {
|
||||
SConnList list;
|
||||
plist = &list;
|
||||
QUEUE_INIT(&plist->conn);
|
||||
taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist));
|
||||
}
|
||||
|
||||
if (QUEUE_IS_EMPTY(&plist->conn)) {
|
||||
return NULL;
|
||||
}
|
||||
queue* h = QUEUE_HEAD(&plist->conn);
|
||||
QUEUE_REMOVE(h);
|
||||
return QUEUE_DATA(h, SCliConn, conn);
|
||||
}
|
||||
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
|
||||
char key[128] = {0};
|
||||
tstrncpy(key, ip, strlen(ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||
|
||||
STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx;
|
||||
SRpcInfo* pRpc = ctx->pRpc;
|
||||
conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
|
||||
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key));
|
||||
// list already create before
|
||||
assert(plist != NULL);
|
||||
QUEUE_PUSH(&plist->conn, &conn->conn);
|
||||
}
|
||||
static bool clientReadComplete(SConnBuffer* data) {
|
||||
STransMsgHead head;
|
||||
int32_t headLen = sizeof(head);
|
||||
if (data->len >= headLen) {
|
||||
memcpy((char*)&head, data->buf, headLen);
|
||||
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
|
||||
if (msgLen > data->len) {
|
||||
data->left = msgLen - data->len;
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
// impl later
|
||||
static const int CAPACITY = 512;
|
||||
|
||||
SCliConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
if (pBuf->cap == 0) {
|
||||
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
|
||||
pBuf->len = 0;
|
||||
pBuf->cap = CAPACITY;
|
||||
pBuf->left = -1;
|
||||
buf->base = pBuf->buf;
|
||||
buf->len = CAPACITY;
|
||||
} else {
|
||||
if (pBuf->len >= pBuf->cap) {
|
||||
if (pBuf->left == -1) {
|
||||
pBuf->cap *= 2;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
|
||||
} else if (pBuf->len + pBuf->left > pBuf->cap) {
|
||||
pBuf->cap = pBuf->len + pBuf->left;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
|
||||
}
|
||||
}
|
||||
buf->base = pBuf->buf + pBuf->len;
|
||||
buf->len = pBuf->cap - pBuf->len;
|
||||
}
|
||||
}
|
||||
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||
// impl later
|
||||
SCliConn* conn = handle->data;
|
||||
SCliConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
if (clientReadComplete(pBuf)) {
|
||||
tDebug("alread read complete pack");
|
||||
clientProcessData(conn);
|
||||
} else {
|
||||
tDebug("read halp packet, continue to read");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread != UV_EOF) {
|
||||
tDebug("Read error %s\n", uv_err_name(nread));
|
||||
}
|
||||
//
|
||||
uv_close((uv_handle_t*)handle, clientDestroy);
|
||||
}
|
||||
|
@ -84,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) {
|
|||
}
|
||||
static void clientDestroy(uv_handle_t* handle) {
|
||||
SCliConn* conn = handle->data;
|
||||
QUEUE_REMOVE(&conn->conn);
|
||||
clientConnDestroy(conn);
|
||||
}
|
||||
|
||||
|
@ -96,15 +261,17 @@ static void clientWriteCb(uv_write_t* req, int status) {
|
|||
return;
|
||||
}
|
||||
|
||||
uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb);
|
||||
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
|
||||
// impl later
|
||||
}
|
||||
|
||||
static void clientWrite(SCliConn* pConn) {
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont);
|
||||
int msgLen = rpcMsgLenFromCont(pMsg->context->contLen);
|
||||
char* msg = (char*)(pHead);
|
||||
SCliMsg* pCliMsg = pConn->data;
|
||||
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
|
||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||
|
||||
int msgLen = transMsgLenFromCont(pMsg->contLen);
|
||||
char* msg = (char*)(pHead);
|
||||
|
||||
uv_buf_t wb = uv_buf_init(msg, msgLen);
|
||||
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
||||
|
@ -118,77 +285,52 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
|||
return;
|
||||
}
|
||||
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
SEpSet* pEpSet = &pMsg->context->epSet;
|
||||
SRpcMsg rpcMsg;
|
||||
// rpcMsg.ahandle = pMsg->context->ahandle;
|
||||
// rpcMsg.pCont = NULL;
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
|
||||
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
||||
if (status != 0) {
|
||||
// call user fp later
|
||||
tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status));
|
||||
SRpcInfo* pRpc = pMsg->context->pRpc;
|
||||
(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
|
||||
tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
|
||||
SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||
uv_close((uv_handle_t*)req->handle, clientDestroy);
|
||||
return;
|
||||
}
|
||||
assert(pConn->stream == req->handle);
|
||||
}
|
||||
|
||||
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
|
||||
// impl later
|
||||
|
||||
return NULL;
|
||||
}
|
||||
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
|
||||
// impl later
|
||||
clientWrite(pConn);
|
||||
}
|
||||
|
||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
SEpSet* pEpSet = &pMsg->context->epSet;
|
||||
|
||||
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
||||
|
||||
uint64_t el = taosGetTimestampUs() - pMsg->st;
|
||||
uint64_t et = taosGetTimestampUs();
|
||||
uint64_t el = et - pMsg->st;
|
||||
tDebug("msg tran time cost: %" PRIu64 "", el);
|
||||
et = taosGetTimestampUs();
|
||||
|
||||
SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port);
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port);
|
||||
if (conn != NULL) {
|
||||
// impl later
|
||||
conn->data = pMsg;
|
||||
conn->writeReq->data = conn;
|
||||
clientWrite(conn);
|
||||
// uv_buf_t wb;
|
||||
// uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb);
|
||||
} else {
|
||||
SCliConn* conn = malloc(sizeof(SCliConn));
|
||||
SCliConn* conn = calloc(1, sizeof(SCliConn));
|
||||
|
||||
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
||||
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
||||
conn->writeReq = malloc(sizeof(uv_write_t));
|
||||
QUEUE_INIT(&conn->conn);
|
||||
|
||||
conn->connReq.data = conn;
|
||||
conn->data = pMsg;
|
||||
|
||||
struct sockaddr_in addr;
|
||||
uv_ip4_addr(fqdn, port, &addr);
|
||||
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||
// handle error in callback if fail to connect
|
||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
||||
|
||||
// SRpcMsg rpcMsg;
|
||||
// SEpSet* pEpSet = &pMsg->context->epSet;
|
||||
// SRpcInfo* pRpc = pMsg->context->pRpc;
|
||||
//// rpcMsg.ahandle = pMsg->context->ahandle;
|
||||
// rpcMsg.pCont = NULL;
|
||||
// rpcMsg.ahandle = pMsg->context->ahandle;
|
||||
// uint64_t el1 = taosGetTimestampUs() - et;
|
||||
// tError("msg tran back first: time cost: %" PRIu64 "", el1);
|
||||
// et = taosGetTimestampUs();
|
||||
//(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
|
||||
// uint64_t el2 = taosGetTimestampUs() - et;
|
||||
// tError("msg tran back second: time cost: %" PRIu64 "", el2);
|
||||
}
|
||||
}
|
||||
static void clientAsyncCb(uv_async_t* handle) {
|
||||
|
@ -205,7 +347,8 @@ static void clientAsyncCb(uv_async_t* handle) {
|
|||
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||
queue* h = QUEUE_HEAD(&wq);
|
||||
QUEUE_REMOVE(h);
|
||||
pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
clientHandleReq(pMsg, pThrd);
|
||||
count++;
|
||||
if (count >= 2) {
|
||||
|
@ -216,11 +359,15 @@ static void clientAsyncCb(uv_async_t* handle) {
|
|||
|
||||
static void* clientThread(void* arg) {
|
||||
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
||||
SRpcInfo* pRpc = pThrd->shandle;
|
||||
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
|
||||
uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0);
|
||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
||||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||
SClientObj* cli = calloc(1, sizeof(SClientObj));
|
||||
|
||||
memcpy(cli->label, label, strlen(label));
|
||||
cli->numOfThreads = numOfThreads;
|
||||
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
|
||||
|
@ -236,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
|
||||
pThrd->cliAsync->data = pThrd;
|
||||
|
||||
pThrd->pTimer = malloc(sizeof(uv_timer_t));
|
||||
uv_timer_init(pThrd->loop, pThrd->pTimer);
|
||||
|
||||
pThrd->shandle = shandle;
|
||||
|
||||
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
|
||||
if (err == 0) {
|
||||
tDebug("sucess to create tranport-client thread %d", i);
|
||||
|
@ -245,22 +396,44 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
}
|
||||
return cli;
|
||||
}
|
||||
static void clientMsgDestroy(SCliMsg* pMsg) {
|
||||
// impl later
|
||||
free(pMsg);
|
||||
}
|
||||
void taosCloseClient(void* arg) {
|
||||
// impl later
|
||||
SClientObj* cli = arg;
|
||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||
SCliThrdObj* pThrd = cli->pThreadObj[i];
|
||||
pthread_join(pThrd->thread, NULL);
|
||||
pthread_mutex_destroy(&pThrd->msgMtx);
|
||||
free(pThrd->cliAsync);
|
||||
free(pThrd->loop);
|
||||
free(pThrd);
|
||||
}
|
||||
free(cli->pThreadObj);
|
||||
free(cli);
|
||||
}
|
||||
|
||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||
// impl later
|
||||
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
|
||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
||||
|
||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||
|
||||
int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
||||
int32_t flen = 0;
|
||||
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
|
||||
// imp later
|
||||
}
|
||||
|
||||
SRpcReqContext* pContext;
|
||||
pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
|
||||
pContext->ahandle = pMsg->ahandle;
|
||||
pContext->pRpc = (SRpcInfo*)shandle;
|
||||
pContext->epSet = *pEpSet;
|
||||
pContext->contLen = len;
|
||||
pContext->pCont = pMsg->pCont;
|
||||
pContext->msgType = pMsg->msgType;
|
||||
pContext->oldInUse = pEpSet->inUse;
|
||||
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||
|
||||
pCtx->pRpc = (SRpcInfo*)shandle;
|
||||
pCtx->ahandle = pMsg->ahandle;
|
||||
pCtx->msgType = pMsg->msgType;
|
||||
pCtx->ip = strdup(ip);
|
||||
pCtx->port = port;
|
||||
|
||||
assert(pRpc->connType == TAOS_CONN_CLIENT);
|
||||
// atomic or not
|
||||
|
@ -268,14 +441,15 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
|||
if (pRpc->index++ >= pRpc->numOfThreads) {
|
||||
pRpc->index = 0;
|
||||
}
|
||||
SCliMsg* msg = malloc(sizeof(SCliMsg));
|
||||
msg->context = pContext;
|
||||
msg->st = taosGetTimestampUs();
|
||||
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
||||
cliMsg->ctx = pCtx;
|
||||
cliMsg->msg = *pMsg;
|
||||
cliMsg->st = taosGetTimestampUs();
|
||||
|
||||
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
||||
|
||||
pthread_mutex_lock(&thrd->msgMtx);
|
||||
QUEUE_PUSH(&thrd->msg, &msg->q);
|
||||
QUEUE_PUSH(&thrd->msg, &cliMsg->q);
|
||||
pthread_mutex_unlock(&thrd->msgMtx);
|
||||
|
||||
uv_async_send(thrd->cliAsync);
|
||||
|
|
|
@ -30,6 +30,20 @@ int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
|||
|
||||
return ret;
|
||||
}
|
||||
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||
T_MD5_CTX context;
|
||||
int ret = -1;
|
||||
|
||||
tMD5Init(&context);
|
||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
|
||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||
tMD5Final(&context);
|
||||
|
||||
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
|
||||
|
||||
return ret;
|
||||
}
|
||||
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||
T_MD5_CTX context;
|
||||
|
||||
|
@ -41,6 +55,17 @@ void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
|||
|
||||
memcpy(pAuth, context.digest, sizeof(context.digest));
|
||||
}
|
||||
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||
T_MD5_CTX context;
|
||||
|
||||
tMD5Init(&context);
|
||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
|
||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||
tMD5Final(&context);
|
||||
|
||||
memcpy(pAuth, context.digest, sizeof(context.digest));
|
||||
}
|
||||
|
||||
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
|
||||
SRpcHead* pHead = rpcHeadFromCont(pCont);
|
||||
|
@ -81,6 +106,54 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
|
|||
return finalLen;
|
||||
}
|
||||
|
||||
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
|
||||
return false;
|
||||
// SRpcHead* pHead = rpcHeadFromCont(pCont);
|
||||
bool succ = false;
|
||||
int overhead = sizeof(STransCompMsg);
|
||||
if (!NEEDTO_COMPRESSS_MSG(len)) {
|
||||
return succ;
|
||||
}
|
||||
|
||||
char* buf = malloc(len + overhead + 8); // 8 extra bytes
|
||||
if (buf == NULL) {
|
||||
tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
|
||||
*flen = len;
|
||||
return succ;
|
||||
}
|
||||
|
||||
int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
|
||||
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
|
||||
/*
|
||||
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
||||
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
||||
*/
|
||||
if (clen > 0 && clen < len - overhead) {
|
||||
STransCompMsg* pComp = (STransCompMsg*)msg;
|
||||
pComp->reserved = 0;
|
||||
pComp->contLen = htonl(len);
|
||||
memcpy(msg + overhead, buf, clen);
|
||||
|
||||
tDebug("compress rpc msg, before:%d, after:%d", len, clen);
|
||||
*flen = clen + overhead;
|
||||
succ = true;
|
||||
} else {
|
||||
*flen = len;
|
||||
succ = false;
|
||||
}
|
||||
free(buf);
|
||||
return succ;
|
||||
}
|
||||
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
|
||||
// impl later
|
||||
return false;
|
||||
STransCompMsg* pComp = (STransCompMsg*)msg;
|
||||
|
||||
int overhead = sizeof(STransCompMsg);
|
||||
int clen = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
|
||||
int overhead = sizeof(SRpcComp);
|
||||
SRpcHead* pNewHead = NULL;
|
||||
|
@ -114,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
|
|||
return pHead;
|
||||
}
|
||||
|
||||
void transConnCtxDestroy(STransConnCtx* ctx) {
|
||||
free(ctx->ip);
|
||||
free(ctx);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -16,13 +16,6 @@
|
|||
#ifdef USE_UV
|
||||
#include "transComm.h"
|
||||
|
||||
typedef struct SConnBuffer {
|
||||
char* buf;
|
||||
int len;
|
||||
int cap;
|
||||
int left;
|
||||
} SConnBuffer;
|
||||
|
||||
typedef struct SConn {
|
||||
uv_tcp_t* pTcp;
|
||||
uv_write_t* pWriter;
|
||||
|
@ -100,31 +93,32 @@ static void* acceptThread(void* arg);
|
|||
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
/*
|
||||
* formate of data buffer:
|
||||
* |<-------SRpcReqContext------->|<------------data read from socket----------->|
|
||||
* |<--------------------------data from socket------------------------------->|
|
||||
* |<------STransMsgHead------->|<-------------------other data--------------->|
|
||||
*/
|
||||
static const int CAPACITY = 1024;
|
||||
|
||||
SConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->connBuf;
|
||||
if (pBuf->cap == 0) {
|
||||
pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char));
|
||||
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
|
||||
pBuf->len = 0;
|
||||
pBuf->cap = CAPACITY;
|
||||
pBuf->left = -1;
|
||||
|
||||
buf->base = pBuf->buf + RPC_RESERVE_SIZE;
|
||||
buf->base = pBuf->buf;
|
||||
buf->len = CAPACITY;
|
||||
} else {
|
||||
if (pBuf->len >= pBuf->cap) {
|
||||
if (pBuf->left == -1) {
|
||||
pBuf->cap *= 2;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE);
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
|
||||
} else if (pBuf->len + pBuf->left > pBuf->cap) {
|
||||
pBuf->cap = pBuf->len + pBuf->left;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE);
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
|
||||
}
|
||||
}
|
||||
buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE;
|
||||
buf->base = pBuf->buf + pBuf->len;
|
||||
buf->len = pBuf->cap - pBuf->len;
|
||||
}
|
||||
}
|
||||
|
@ -133,11 +127,11 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
|||
//
|
||||
static bool readComplete(SConnBuffer* data) {
|
||||
// TODO(yihao): handle pipeline later
|
||||
SRpcHead rpcHead;
|
||||
int32_t headLen = sizeof(rpcHead);
|
||||
STransMsgHead head;
|
||||
int32_t headLen = sizeof(head);
|
||||
if (data->len >= headLen) {
|
||||
memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen);
|
||||
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
||||
memcpy((char*)&head, data->buf, headLen);
|
||||
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
|
||||
if (msgLen > data->len) {
|
||||
data->left = msgLen - data->len;
|
||||
return false;
|
||||
|
@ -150,21 +144,21 @@ static bool readComplete(SConnBuffer* data) {
|
|||
}
|
||||
|
||||
static void uvDoProcess(SRecvInfo* pRecv) {
|
||||
SRpcHead* pHead = (SRpcHead*)pRecv->msg;
|
||||
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
|
||||
SConn* pConn = pRecv->thandle;
|
||||
|
||||
// impl later
|
||||
STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
|
||||
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
|
||||
SConn* pConn = pRecv->thandle;
|
||||
tDump(pRecv->msg, pRecv->msgLen);
|
||||
|
||||
terrno = 0;
|
||||
SRpcReqContext* pContest;
|
||||
// SRpcReqContext* pContest;
|
||||
|
||||
// do auth and check
|
||||
}
|
||||
|
||||
static int uvAuthMsg(SConn* pConn, char* msg, int len) {
|
||||
SRpcHead* pHead = (SRpcHead*)msg;
|
||||
int code = 0;
|
||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||
|
||||
int code = 0;
|
||||
|
||||
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
|
||||
// secured link, or no authentication
|
||||
|
@ -216,15 +210,15 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
|
|||
|
||||
// refers specifically to query or insert timeout
|
||||
static void uvHandleActivityTimeout(uv_timer_t* handle) {
|
||||
// impl later
|
||||
SConn* conn = handle->data;
|
||||
tDebug("%p timeout since no activity", conn);
|
||||
}
|
||||
|
||||
static void uvProcessData(SConn* pConn) {
|
||||
SRecvInfo info;
|
||||
SRecvInfo* p = &info;
|
||||
SConnBuffer* pBuf = &pConn->connBuf;
|
||||
p->msg = pBuf->buf + RPC_RESERVE_SIZE;
|
||||
p->msg = pBuf->buf;
|
||||
p->msgLen = pBuf->len;
|
||||
p->ip = 0;
|
||||
p->port = 0;
|
||||
|
@ -233,28 +227,33 @@ static void uvProcessData(SConn* pConn) {
|
|||
p->chandle = NULL;
|
||||
|
||||
//
|
||||
SRpcHead* pHead = (SRpcHead*)p->msg;
|
||||
assert(rpcIsReq(pHead->msgType));
|
||||
STransMsgHead* pHead = (STransMsgHead*)p->msg;
|
||||
assert(transIsReq(pHead->msgType));
|
||||
|
||||
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
|
||||
pConn->ahandle = (void*)pHead->ahandle;
|
||||
// auth here
|
||||
// auth should not do in rpc thread
|
||||
|
||||
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
return;
|
||||
}
|
||||
// int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
|
||||
// if (code != 0) {
|
||||
// terrno = code;
|
||||
// return;
|
||||
//}
|
||||
pHead->code = htonl(pHead->code);
|
||||
|
||||
int32_t dlen = 0;
|
||||
SRpcMsg rpcMsg;
|
||||
|
||||
pHead = rpcDecompressRpcMsg(pHead);
|
||||
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
|
||||
if (transDecompressMsg(NULL, 0, NULL)) {
|
||||
// add compress later
|
||||
// pHead = rpcDecompressRpcMsg(pHead);
|
||||
} else {
|
||||
// impl later
|
||||
}
|
||||
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||
rpcMsg.pCont = pHead->content;
|
||||
rpcMsg.msgType = pHead->msgType;
|
||||
rpcMsg.code = pHead->code;
|
||||
rpcMsg.ahandle = pConn->ahandle;
|
||||
rpcMsg.ahandle = NULL;
|
||||
rpcMsg.handle = pConn;
|
||||
|
||||
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
||||
|
@ -265,13 +264,13 @@ static void uvProcessData(SConn* pConn) {
|
|||
|
||||
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||
// opt
|
||||
SConn* ctx = cli->data;
|
||||
SConnBuffer* pBuf = &ctx->connBuf;
|
||||
SConn* conn = cli->data;
|
||||
SConnBuffer* pBuf = &conn->connBuf;
|
||||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
if (readComplete(pBuf)) {
|
||||
tDebug("alread read complete packet");
|
||||
uvProcessData(ctx);
|
||||
uvProcessData(conn);
|
||||
} else {
|
||||
tDebug("read half packet, continue to read");
|
||||
}
|
||||
|
@ -320,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
|||
return;
|
||||
}
|
||||
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
|
||||
|
||||
uv_timer_stop(conn->pTimer);
|
||||
|
||||
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
||||
}
|
||||
}
|
||||
|
@ -423,7 +425,7 @@ void* workerThread(void* arg) {
|
|||
uv_loop_init(pThrd->loop);
|
||||
|
||||
// SRpcInfo* pRpc = pThrd->shandle;
|
||||
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
||||
uv_pipe_init(pThrd->loop, pThrd->pipe, 0);
|
||||
uv_pipe_open(pThrd->pipe, pThrd->fd);
|
||||
|
||||
pThrd->pipe->data = pThrd;
|
||||
|
@ -491,6 +493,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
|
||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
|
||||
|
||||
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
||||
int fds[2];
|
||||
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
||||
|
@ -522,6 +525,22 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
|
||||
return srv;
|
||||
}
|
||||
void taosCloseServer(void* arg) {
|
||||
// impl later
|
||||
SServerObj* srv = arg;
|
||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||
SWorkThrdObj* pThrd = srv->pThreadObj[i];
|
||||
pthread_join(pThrd->thread, NULL);
|
||||
free(srv->pipe[i]);
|
||||
free(pThrd->loop);
|
||||
free(pThrd);
|
||||
}
|
||||
free(srv->loop);
|
||||
free(srv->pipe);
|
||||
free(srv->pThreadObj);
|
||||
pthread_join(srv->thread, NULL);
|
||||
free(srv);
|
||||
}
|
||||
|
||||
void rpcSendResponse(const SRpcMsg* pMsg) {
|
||||
SConn* pConn = pMsg->handle;
|
||||
|
|
|
@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
// tsem_post(&pInfo->rspSem);
|
||||
tsem_post(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
|
@ -60,6 +61,7 @@ static void *sendRequest(void *param) {
|
|||
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||
// tsem_wait(&pInfo->rspSem);
|
||||
tsem_wait(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
#include "os.h"
|
||||
#include "tutil.h"
|
||||
#include "tglobal.h"
|
||||
#include "rpcLog.h"
|
||||
#include "trpc.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
typedef struct {
|
||||
int index;
|
||||
SRpcEpSet epSet;
|
||||
int num;
|
||||
int numOfReqs;
|
||||
int msgSize;
|
||||
tsem_t rspSem;
|
||||
tsem_t *pOverSem;
|
||||
pthread_t thread;
|
||||
void *pRpc;
|
||||
} SInfo;
|
||||
|
||||
|
||||
static int tcount = 0;
|
||||
static int terror = 0;
|
||||
|
||||
static void *sendRequest(void *param) {
|
||||
SInfo *pInfo = (SInfo *)param;
|
||||
SRpcMsg rpcMsg, rspMsg;
|
||||
|
||||
tDebug("thread:%d, start to send request", pInfo->index);
|
||||
|
||||
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||
pInfo->num++;
|
||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||
rpcMsg.contLen = pInfo->msgSize;
|
||||
rpcMsg.handle = pInfo;
|
||||
rpcMsg.msgType = 1;
|
||||
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||
|
||||
rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg);
|
||||
|
||||
// handle response
|
||||
if (rspMsg.code != 0) terror++;
|
||||
|
||||
tDebug("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
|
||||
|
||||
rpcFreeCont(rspMsg.pCont);
|
||||
|
||||
if ( pInfo->num % 20000 == 0 )
|
||||
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||
}
|
||||
|
||||
tDebug("thread:%d, it is over", pInfo->index);
|
||||
tcount++;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
SRpcInit rpcInit;
|
||||
SRpcEpSet epSet;
|
||||
int msgSize = 128;
|
||||
int numOfReqs = 0;
|
||||
int appThreads = 1;
|
||||
char serverIp[40] = "127.0.0.1";
|
||||
char secret[TSDB_KEY_LEN] = "mypassword";
|
||||
struct timeval systemTime;
|
||||
int64_t startTime, endTime;
|
||||
pthread_attr_t thattr;
|
||||
|
||||
// server info
|
||||
epSet.numOfEps = 1;
|
||||
epSet.inUse = 0;
|
||||
epSet.port[0] = 7000;
|
||||
epSet.port[1] = 7000;
|
||||
strcpy(epSet.fqdn[0], serverIp);
|
||||
strcpy(epSet.fqdn[1], "192.168.0.1");
|
||||
|
||||
// client info
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
//rpcInit.localIp = "0.0.0.0";
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "APP";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.sessions = 100;
|
||||
rpcInit.idleTime = tsShellActivityTimer*1000;
|
||||
rpcInit.user = "michael";
|
||||
rpcInit.secret = secret;
|
||||
rpcInit.ckey = "key";
|
||||
rpcInit.spi = 1;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||
epSet.port[0] = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
|
||||
tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
|
||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||
msgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||
rpcInit.sessions = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
|
||||
numOfReqs = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
|
||||
appThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
|
||||
tsCompressMsgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
|
||||
rpcInit.user = argv[++i];
|
||||
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
|
||||
rpcInit.secret = argv[++i];
|
||||
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
|
||||
rpcInit.spi = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||
printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
|
||||
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
||||
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
||||
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
taosInitLog("client.log", 100000, 10);
|
||||
|
||||
void *pRpc = rpcOpen(&rpcInit);
|
||||
if (pRpc == NULL) {
|
||||
tError("failed to initialize RPC");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tInfo("client is initialized");
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
|
||||
|
||||
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i=0; i<appThreads; ++i) {
|
||||
pInfo->index = i;
|
||||
pInfo->epSet = epSet;
|
||||
pInfo->numOfReqs = numOfReqs;
|
||||
pInfo->msgSize = msgSize;
|
||||
tsem_init(&pInfo->rspSem, 0, 0);
|
||||
pInfo->pRpc = pRpc;
|
||||
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||
pInfo++;
|
||||
}
|
||||
|
||||
do {
|
||||
usleep(1);
|
||||
} while ( tcount < appThreads);
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
|
||||
float usedTime = (endTime - startTime)/1000.0; // mseconds
|
||||
|
||||
tInfo("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
|
||||
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
|
||||
|
||||
taosCloseLog();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,207 @@
|
|||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "task.h"
|
||||
#include <uv.h>
|
||||
|
||||
#define NUM_OF_THREAD 1
|
||||
#define TIMEOUT 10000
|
||||
|
||||
typedef struct SThreadObj {
|
||||
pthread_t thread;
|
||||
uv_pipe_t *pipe;
|
||||
uv_loop_t *loop;
|
||||
uv_async_t *workerAsync; //
|
||||
int fd;
|
||||
} SThreadObj;
|
||||
|
||||
typedef struct SServerObj {
|
||||
uv_tcp_t server;
|
||||
uv_loop_t *loop;
|
||||
int workerIdx;
|
||||
int numOfThread;
|
||||
SThreadObj **pThreadObj;
|
||||
uv_pipe_t **pipe;
|
||||
} SServerObj;
|
||||
|
||||
typedef struct SConnCtx {
|
||||
uv_tcp_t *pClient;
|
||||
uv_timer_t *pTimer;
|
||||
uv_async_t *pWorkerAsync;
|
||||
int ref;
|
||||
} SConnCtx;
|
||||
|
||||
void echo_write(uv_write_t *req, int status) {
|
||||
if (status < 0) {
|
||||
fprintf(stderr, "Write error %s\n", uv_err_name(status));
|
||||
}
|
||||
printf("write data to client\n");
|
||||
free(req);
|
||||
}
|
||||
|
||||
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
||||
|
||||
SConnCtx *pConn = container_of(client, SConnCtx, pClient);
|
||||
pConn->ref += 1;
|
||||
printf("read data %d\n", nread, buf->base, buf->len);
|
||||
if (nread > 0) {
|
||||
uv_write_t *req = (uv_write_t *)malloc(sizeof(uv_write_t));
|
||||
// dispatch request to database other process thread
|
||||
// just write out
|
||||
uv_buf_t write_out;
|
||||
write_out.base = buf->base;
|
||||
write_out.len = nread;
|
||||
uv_write((uv_write_t *)req, client, &write_out, 1, echo_write);
|
||||
free(buf->base);
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread < 0) {
|
||||
if (nread != UV_EOF)
|
||||
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
|
||||
uv_close((uv_handle_t *)client, NULL);
|
||||
}
|
||||
free(buf->base);
|
||||
}
|
||||
|
||||
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
|
||||
buf->base = malloc(suggested_size);
|
||||
buf->len = suggested_size;
|
||||
}
|
||||
|
||||
void on_new_connection(uv_stream_t *s, int status) {
|
||||
if (status == -1) {
|
||||
// error!
|
||||
return;
|
||||
}
|
||||
SServerObj *pObj = container_of(s, SServerObj, server);
|
||||
printf("new_connection from client\n");
|
||||
|
||||
uv_tcp_t *client = (uv_tcp_t *)malloc(sizeof(uv_tcp_t));
|
||||
uv_tcp_init(pObj->loop, client);
|
||||
if (uv_accept(s, (uv_stream_t *)client) == 0) {
|
||||
uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));
|
||||
uv_buf_t dummy_buf = uv_buf_init("a", 1);
|
||||
// despatch to worker thread
|
||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
|
||||
uv_write2(write_req, (uv_stream_t *)&(pObj->pipe[pObj->workerIdx][0]),
|
||||
&dummy_buf, 1, (uv_stream_t *)client, echo_write);
|
||||
} else {
|
||||
uv_close((uv_handle_t *)client, NULL);
|
||||
}
|
||||
}
|
||||
void child_on_new_connection(uv_stream_t *q, ssize_t nread,
|
||||
const uv_buf_t *buf) {
|
||||
printf("x child_on_new_connection \n");
|
||||
if (nread < 0) {
|
||||
if (nread != UV_EOF)
|
||||
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
|
||||
uv_close((uv_handle_t *)q, NULL);
|
||||
return;
|
||||
}
|
||||
SThreadObj *pObj = (SThreadObj *)container_of(q, struct SThreadObj, pipe);
|
||||
|
||||
uv_pipe_t *pipe = (uv_pipe_t *)q;
|
||||
if (!uv_pipe_pending_count(pipe)) {
|
||||
fprintf(stderr, "No pending count\n");
|
||||
return;
|
||||
}
|
||||
|
||||
uv_handle_type pending = uv_pipe_pending_type(pipe);
|
||||
assert(pending == UV_TCP);
|
||||
|
||||
SConnCtx *pConn = malloc(sizeof(SConnCtx));
|
||||
|
||||
/* init conn timer*/
|
||||
pConn->pTimer = malloc(sizeof(uv_timer_t));
|
||||
uv_timer_init(pObj->loop, pConn->pTimer);
|
||||
|
||||
pConn->pClient = (uv_tcp_t *)malloc(sizeof(uv_tcp_t));
|
||||
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
|
||||
uv_tcp_init(pObj->loop, pConn->pClient);
|
||||
|
||||
if (uv_accept(q, (uv_stream_t *)(pConn->pClient)) == 0) {
|
||||
uv_os_fd_t fd;
|
||||
uv_fileno((const uv_handle_t *)pConn->pClient, &fd);
|
||||
fprintf(stderr, "Worker Accepted fd %d\n", fd);
|
||||
uv_timer_start(pConn->pTimer, timeOutCallBack, TIMEOUT, 0);
|
||||
uv_read_start((uv_stream_t *)(pConn->pClient), alloc_buffer, echo_read);
|
||||
} else {
|
||||
uv_timer_stop(pConn->pTimer);
|
||||
free(pConn->pTimer);
|
||||
uv_close((uv_handle_t *)pConn->pClient, NULL);
|
||||
free(pConn->pClient);
|
||||
free(pConn);
|
||||
}
|
||||
}
|
||||
|
||||
static void workerAsyncCallback(uv_async_t *handle) {
|
||||
SThreadObj *pObj = container_of(handle, SThreadObj, workerAsync);
|
||||
// do nothing
|
||||
}
|
||||
void *worker_thread(void *arg) {
|
||||
SThreadObj *pObj = (SThreadObj *)arg;
|
||||
int fd = pObj->fd;
|
||||
pObj->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t));
|
||||
uv_loop_init(pObj->loop);
|
||||
|
||||
uv_pipe_init(pObj->loop, pObj->pipe, 1);
|
||||
uv_pipe_open(pObj->pipe, fd);
|
||||
|
||||
pObj->workerAsync = malloc(sizeof(uv_async_t));
|
||||
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCallback);
|
||||
uv_read_start((uv_stream_t *)pObj->pipe, alloc_buffer,
|
||||
child_on_new_connection);
|
||||
|
||||
uv_run(pObj->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
int main() {
|
||||
|
||||
SServerObj *server = calloc(1, sizeof(SServerObj));
|
||||
server->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t));
|
||||
server->numOfThread = NUM_OF_THREAD;
|
||||
server->workerIdx = 0;
|
||||
server->pThreadObj =
|
||||
(SThreadObj **)calloc(server->numOfThread, sizeof(SThreadObj *));
|
||||
server->pipe = (uv_pipe_t **)calloc(server->numOfThread, sizeof(uv_pipe_t *));
|
||||
|
||||
uv_loop_init(server->loop);
|
||||
|
||||
for (int i = 0; i < server->numOfThread; i++) {
|
||||
server->pThreadObj[i] = (SThreadObj *)calloc(1, sizeof(SThreadObj));
|
||||
server->pipe[i] = (uv_pipe_t *)calloc(2, sizeof(uv_pipe_t));
|
||||
int fds[2];
|
||||
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE,
|
||||
UV_NONBLOCK_PIPE) != 0) {
|
||||
return -1;
|
||||
}
|
||||
uv_pipe_init(server->loop, &(server->pipe[i][0]), 1);
|
||||
uv_pipe_open(&(server->pipe[i][0]), fds[1]); // init write
|
||||
|
||||
server->pThreadObj[i]->fd = fds[0];
|
||||
server->pThreadObj[i]->pipe = &(server->pipe[i][1]); // init read
|
||||
int err = pthread_create(&(server->pThreadObj[i]->thread), NULL,
|
||||
worker_thread, (void *)(server->pThreadObj[i]));
|
||||
if (err == 0) {
|
||||
printf("thread %d create\n", i);
|
||||
} else {
|
||||
printf("thread %d create failed", i);
|
||||
}
|
||||
|
||||
uv_tcp_init(server->loop, &server->server);
|
||||
struct sockaddr_in bind_addr;
|
||||
uv_ip4_addr("0.0.0.0", 7000, &bind_addr);
|
||||
uv_tcp_bind(&server->server, (const struct sockaddr *)&bind_addr, 0);
|
||||
int err = 0;
|
||||
if ((err = uv_listen((uv_stream_t *)&server->server, 128,
|
||||
on_new_connection)) != 0) {
|
||||
fprintf(stderr, "Listen error %s\n", uv_err_name(err));
|
||||
return 2;
|
||||
}
|
||||
uv_run(server->loop, UV_RUN_DEFAULT);
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -361,6 +361,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ char dbName[32] = "db";
|
|||
char stbName[64] = "st";
|
||||
int32_t numOfThreads = 1;
|
||||
int64_t numOfTables = 200000;
|
||||
int64_t startOffset = 0;
|
||||
int32_t createTable = 1;
|
||||
int32_t insertData = 0;
|
||||
int32_t batchNumOfTbl = 100;
|
||||
|
@ -84,7 +85,7 @@ void createDbAndStb() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName);
|
||||
sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j int)", stbName);
|
||||
pRes = taos_query(con, qstr);
|
||||
code = taos_errno(pRes);
|
||||
if (code != 0) {
|
||||
|
@ -181,8 +182,19 @@ void *threadFunc(void *param) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
// printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex,
|
||||
// pInfo->tableEndIndex);
|
||||
pError("====before thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
|
||||
pInfo->threadIndex,
|
||||
pInfo->tableBeginIndex,
|
||||
pInfo->tableEndIndex);
|
||||
|
||||
pInfo->tableBeginIndex += startOffset;
|
||||
pInfo->tableEndIndex += startOffset;
|
||||
|
||||
pError("====after thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
|
||||
pInfo->threadIndex,
|
||||
pInfo->tableBeginIndex,
|
||||
pInfo->tableEndIndex);
|
||||
|
||||
sprintf(qstr, "use %s", pInfo->dbName);
|
||||
TAOS_RES *pRes = taos_query(con, qstr);
|
||||
taos_free_result(pRes);
|
||||
|
@ -210,7 +222,7 @@ void *threadFunc(void *param) {
|
|||
TAOS_RES *pRes = taos_query(con, qstr);
|
||||
code = taos_errno(pRes);
|
||||
if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
|
||||
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
|
||||
pError("failed to create table reason:%s, sql: %s", tstrerror(code), qstr);
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
int64_t endTs = taosGetTimestampUs();
|
||||
|
@ -296,6 +308,8 @@ void printHelp() {
|
|||
printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads);
|
||||
printf("%s%s\n", indent, "-n");
|
||||
printf("%s%s%s%" PRId64 "\n", indent, indent, "numOfTables, default is ", numOfTables);
|
||||
printf("%s%s\n", indent, "-g");
|
||||
printf("%s%s%s%" PRId64 "\n", indent, indent, "startOffset, default is ", startOffset);
|
||||
printf("%s%s\n", indent, "-v");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", numOfVgroups);
|
||||
printf("%s%s\n", indent, "-a");
|
||||
|
@ -329,6 +343,8 @@ void parseArgument(int32_t argc, char *argv[]) {
|
|||
numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0) {
|
||||
numOfTables = atoll(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-g") == 0) {
|
||||
startOffset = atoll(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-v") == 0) {
|
||||
numOfVgroups = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-a") == 0) {
|
||||
|
@ -352,6 +368,7 @@ void parseArgument(int32_t argc, char *argv[]) {
|
|||
pPrint("%s stbName:%s %s", GREEN, stbName, NC);
|
||||
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
|
||||
pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC);
|
||||
pPrint("%s startOffset:%" PRId64 " %s", GREEN, startOffset, NC);
|
||||
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
|
||||
pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC);
|
||||
pPrint("%s createTable:%d %s", GREEN, createTable, NC);
|
||||
|
@ -381,7 +398,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
createDbAndStb();
|
||||
}
|
||||
|
||||
pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables);
|
||||
pPrint("%d threads are spawned to create %" PRId64 " tables, offset is %" PRId64 " ", numOfThreads, numOfTables, startOffset);
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
|
@ -407,7 +424,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
int64_t tableFrom = 0;
|
||||
for (int32_t i = 0; i < numOfThreads; ++i) {
|
||||
pInfo[i].tableBeginIndex = tableFrom;
|
||||
pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1;
|
||||
pInfo[i].tableEndIndex = (i < b ? tableFrom + a : tableFrom + a - 1);
|
||||
tableFrom = pInfo[i].tableEndIndex + 1;
|
||||
pInfo[i].threadIndex = i;
|
||||
pInfo[i].minDelay = INT64_MAX;
|
||||
|
|
Loading…
Reference in New Issue