Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0

This commit is contained in:
Hongze Cheng 2022-05-12 11:36:09 +00:00
commit e06f05009b
38 changed files with 1561 additions and 833 deletions

View File

@ -107,7 +107,7 @@ int32_t create_topic() {
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1");
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
@ -166,6 +166,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);

View File

@ -43,7 +43,7 @@ extern int32_t tsMaxNumOfDistinctResults;
extern int32_t tsCompatibleModel;
extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth;
extern int64_t tsTickPerDay[3];
extern int64_t tsTickPerMin[3];
// multi-process
extern bool tsMultiProcess;

View File

@ -2381,6 +2381,7 @@ typedef struct {
typedef struct {
SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t withTbName;
int32_t epoch;
uint64_t reqId;
int64_t consumerId;

View File

@ -217,6 +217,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL)

View File

@ -39,15 +39,6 @@ extern "C" {
//======================================================================================
//begin API to taosd and qworker
enum {
UDFC_CODE_STOPPING = -1,
UDFC_CODE_PIPE_READ_ERR = -2,
UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5,
UDFC_CODE_NO_PIPE = -6,
};
typedef void *UdfcFuncHandle;
/**
@ -89,6 +80,7 @@ typedef struct SUdfColumnData {
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
@ -232,6 +224,7 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
} else {
udfColDataSetNull_f(pColumn, row);
}
pColumn->hasNull = true;
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {

View File

@ -645,7 +645,16 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801)
#define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802)
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2604)
#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
//udf
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)
#define TSDB_CODE_UDF_PIPE_READ_ERR TAOS_DEF_ERROR_CODE(0, 0x2902)
#define TSDB_CODE_UDF_PIPE_CONNECT_ERR TAOS_DEF_ERROR_CODE(0, 0x2903)
#define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904)
#define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905)
#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906)
#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907)
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
#define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001)

View File

@ -59,6 +59,21 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return POINTER_SHIFT(buf, len); }
// --- Bool
static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) {
if (buf != NULL) {
((int8_t *)(*buf))[0] = value ? 1 : 0;
*buf = POINTER_SHIFT(*buf, sizeof(int8_t));
}
return (int32_t)sizeof(int8_t);
}
static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) {
*value = ((int8_t *)buf)[0] == 0 ? false : true;
return POINTER_SHIFT(buf, sizeof(int8_t));
}
// ---- Fixed U16
static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) {
if (buf != NULL) {

View File

@ -61,12 +61,13 @@ struct tmq_conf_t {
char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit;
int8_t resetOffset;
int8_t withTbName;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
char* user;
char* pass;
char* db;
/*char* db;*/
tmq_commit_cb* commitCb;
void* commitCbUserParam;
};
@ -75,6 +76,7 @@ struct tmq_t {
// conf
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
@ -187,6 +189,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->withTbName = -1;
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
@ -240,6 +243,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "msg.with.table.name") == 0) {
if (strcmp(value, "true") == 0) {
conf->withTbName = 1;
} else if (strcmp(value, "false") == 0) {
conf->withTbName = 0;
} else if (strcmp(value, "none") == 0) {
conf->withTbName = -1;
} else {
return TMQ_CONF_INVALID;
}
}
if (strcmp(key, "td.connect.ip") == 0) {
conf->ip = strdup(value);
return TMQ_CONF_OK;
@ -257,7 +272,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK;
}
if (strcmp(key, "td.connect.db") == 0) {
conf->db = strdup(value);
/*conf->db = strdup(value);*/
return TMQ_CONF_OK;
}
@ -485,6 +500,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->withTbName = conf->withTbName;
pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commitCb = conf->commitCb;
@ -1104,6 +1120,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
pReq->subKey[tlen] = TMQ_SEPARATOR;
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
pReq->waitTime = waitTime;
pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch;

View File

@ -1311,6 +1311,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
tlen += taosEncodeFixedI16(buf, pColData->info.type);
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
tlen += taosEncodeFixedBool(buf, pColData->hasNull);
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
@ -1340,6 +1341,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI32(buf, &data.info.bytes);
buf = taosDecodeFixedBool(buf, &data.hasNull);
if (IS_VAR_DATA_TYPE(data.info.type)) {
buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));

View File

@ -153,11 +153,11 @@ bool tsStreamSchedV = true;
/*
* minimum scale for whole system, millisecond by default
* for TSDB_TIME_PRECISION_MILLI: 86400000L
* TSDB_TIME_PRECISION_MICRO: 86400000000L
* TSDB_TIME_PRECISION_NANO: 86400000000000L
* for TSDB_TIME_PRECISION_MILLI: 60000L
* TSDB_TIME_PRECISION_MICRO: 60000000L
* TSDB_TIME_PRECISION_NANO: 60000000000L
*/
int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L};
int64_t tsTickPerMin[] = {60000L, 60000000L, 60000000000L};
// lossy compress 6
char tsLossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty
@ -444,7 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "startUdfd", tsStartUdfd, 0) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
return 0;
}
@ -585,7 +585,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsStartUdfd = cfgGetItem(pCfg, "startUdfd")->bval;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;

View File

@ -144,11 +144,12 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->szCache = pCreate->pages;
pCfg->szBuf = pCreate->buffer * 1024 * 1024;
pCfg->isWeak = true;
pCfg->tsdbCfg.compression = pCreate->compression;
pCfg->tsdbCfg.precision = pCreate->precision;
pCfg->tsdbCfg.days = 10;
pCfg->tsdbCfg.keep0 = 3650;
pCfg->tsdbCfg.keep1 = 3650;
pCfg->tsdbCfg.keep2 = 3650;
pCfg->tsdbCfg.days = pCreate->daysPerFile;
pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
pCfg->tsdbCfg.minRows = pCreate->minRows;
pCfg->tsdbCfg.maxRows = pCreate->maxRows;
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {

View File

@ -518,9 +518,9 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
return (int)((key + 1) / tsTickPerMin[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
return (int)((key / tsTickPerMin[precision] / days));
}
}
@ -770,8 +770,8 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) {
}
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fid * days * tsTickPerDay[precision];
*maxKey = *minKey + days * tsTickPerDay[precision] - 1;
*minKey = fid * days * tsTickPerMin[precision];
*maxKey = *minKey + days * tsTickPerMin[precision] - 1;
}
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) {

View File

@ -427,13 +427,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema;
rsp.withTbName = pExec->withTbName;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
int8_t withTbName = pExec->withTbName;
if (pReq->withTbName != -1) {
withTbName = pReq->withTbName;
}
rsp.withTbName = withTbName;
while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch);
if (consumerEpoch > reqEpoch) {
@ -538,7 +543,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockSchema, &pSW);
}
if (pExec->withTbName) {
if (withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
@ -578,7 +583,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
if (pExec->withTbName) {
if (withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {

View File

@ -216,9 +216,9 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep0 * tsTickPerDay[pCfg->precision];
minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision];
midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision];
maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
@ -948,7 +948,6 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pCommith->pTable = pTable;
if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {

View File

@ -323,7 +323,7 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb);
int64_t now = taosGetTimestamp(pCfg->precision);
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
}
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) {
@ -1047,10 +1047,10 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
}
if (key < 0) {
key -= (daysPerFile * tsTickPerDay[precision]);
key -= (daysPerFile * tsTickPerMin[precision]);
}
int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision])); // set the starting fileId
int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerMin[precision])); // set the starting fileId
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
fid = INT32_MIN;
}

View File

@ -1017,7 +1017,7 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
int32_t daysPerFile = pCfg->days;
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]);
daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
}

View File

@ -63,8 +63,8 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
STSRow *row = NULL;
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days;
terrno = TSDB_CODE_SUCCESS;
// pMsg->length = htonl(pMsg->length);

View File

@ -562,6 +562,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
int32_t nRows;
int32_t tsize, ret;
SEncoder encoder = {0};
terrno = TSDB_CODE_SUCCESS;
pRsp->code = 0;
@ -576,6 +577,11 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
}
submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
if (!submitRsp.pArray) {
pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int i = 0;;) {
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
@ -639,7 +645,12 @@ _exit:
taosArrayDestroy(submitRsp.pArray);
// TODO: the partial success scenario and the error case
// TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) &&
(pRsp->code == TSDB_CODE_SUCCESS)) {
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
}
return 0;
}

View File

@ -695,6 +695,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colMeta.scale = col->info.scale;
udfCol->colMeta.precision = col->info.precision;
udfCol->colData.numOfRows = udfBlock->numOfRows;
udfCol->hasNull = col->hasNull;
if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
@ -731,6 +732,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
col->info.bytes = meta->bytes;
col->info.scale = meta->scale;
col->info.type = meta->type;
col->hasNull = udfCol->hasNull;
SUdfColumnData *data = &udfCol->colData;
if (!IS_VAR_DATA_TYPE(meta->type)) {
@ -929,7 +931,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
while (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = UDFC_CODE_PIPE_READ_ERR;
task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
QUEUE_REMOVE(&task->connTaskQueue);
QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem);
@ -1117,7 +1119,7 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING;
task->errCode = TSDB_CODE_UDF_STOPPING;
}
uv_sem_post(&task->taskSem);
}
@ -1127,7 +1129,7 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING;
task->errCode = TSDB_CODE_UDF_STOPPING;
}
uv_sem_post(&task->taskSem);
}
@ -1211,7 +1213,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
return UDFC_CODE_INVALID_STATE;
return TSDB_CODE_UDF_INVALID_STATE;
}
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
task->errCode = 0;
@ -1225,7 +1227,7 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) {
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
return UDFC_CODE_CONNECT_PIPE_ERR;
return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
}
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
@ -1252,7 +1254,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
if (session->udfUvPipe == NULL) {
fnError("No pipe to udfd");
return UDFC_CODE_NO_PIPE;
return TSDB_CODE_UDF_PIPE_NO_PIPE;
}
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0;
@ -1372,7 +1374,7 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
if (session->udfUvPipe == NULL) {
fnError("pipe to udfd does not exist");
return UDFC_CODE_NO_PIPE;
return TSDB_CODE_UDF_PIPE_NO_PIPE;
}
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
@ -1495,7 +1497,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
taosArrayDestroy(tempBlock.pDataBlock);
taosMemoryFree(newState.buf);
return TSDB_CODE_SUCCESS;
return udfCode;
}
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {

View File

@ -102,7 +102,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
int err = uv_dlopen(udf->path, &udf->lib);
if (err != 0) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
return UDFC_CODE_LOAD_UDF_FAILURE;
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
char initFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
@ -140,20 +140,14 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
return 0;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName);
SUdfSetupRequest *setup = &request.setup;
fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName));
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
@ -166,14 +160,14 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew));
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex);
}
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, udf);
code = udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
@ -188,14 +182,16 @@ void udfdProcessRequest(uv_work_t *req) {
}
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf;
SUdfResponse rsp;
rsp.seqNum = request.seqNum;
rsp.type = request.type;
rsp.code = 0;
rsp.seqNum = request->seqNum;
rsp.type = request->type;
rsp.code = code;
rsp.setupRsp.udfHandle = (int64_t)(handle);
rsp.setupRsp.outputType = udf->outputType;
rsp.setupRsp.outputLen = udf->outputLen;
rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
@ -205,12 +201,12 @@ void udfdProcessRequest(uv_work_t *req) {
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
break;
return;
}
case UDF_TASK_CALL: {
SUdfCallRequest *call = &request.call;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType,
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request->call;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType,
call->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf;
@ -218,13 +214,14 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfResponse *rsp = &response;
SUdfCallResponse *subRsp = &rsp->callRsp;
int32_t code = TSDB_CODE_SUCCESS;
switch(call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
SUdfColumn output = {0};
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
udf->scalarProcFunc(&input, &output);
code = udf->scalarProcFunc(&input, &output);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
freeUdfColumn(&output);
@ -244,7 +241,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize,
.numOfResult = 0};
udf->aggProcFunc(&input, &call->interBuf, &outBuf);
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
subRsp->resultBuf = outBuf;
break;
@ -253,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize,
.numOfResult = 0};
udf->aggFinishFunc(&call->interBuf, &outBuf);
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
subRsp->resultBuf = outBuf;
break;
}
@ -261,9 +258,9 @@ void udfdProcessRequest(uv_work_t *req) {
break;
}
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = code;
subRsp->callType = call->callType;
int32_t len = encodeUdfResponse(NULL, rsp);
@ -274,14 +271,17 @@ void udfdProcessRequest(uv_work_t *req) {
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
break;
return;
}
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = &request.teardown;
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle =
(SUdfcFuncHandle *)(teardown->udfHandle);
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
SUdfTeardownRequest *teardown = &request->teardown;
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf;
bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS;
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
if (udf->refCount == 0) {
@ -302,9 +302,9 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfResponse response;
SUdfResponse *rsp = &response;
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = code;
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
@ -313,6 +313,26 @@ void udfdProcessRequest(uv_work_t *req) {
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
udfdProcessSetupRequest(uvUdf, &request);
break;
}
case UDF_TASK_CALL: {
udfdProcessCallRequest(uvUdf, &request);
break;
}
case UDF_TASK_TEARDOWN: {
udfdProcessTeardownRequest(uvUdf, &request);
break;
}
default: {

View File

@ -27,6 +27,12 @@ int32_t udf2_start(SUdfInterBuf *buf) {
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf;
int8_t numOutput = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (col->colMeta.type != TSDB_DATA_TYPE_INT) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i];

View File

@ -153,11 +153,6 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
.handle = pInfo->msgInfo.handle,
.persistHandle = persistHandle,
.code = 0};
if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH ||
pInfo->msgType == TDMT_VND_QUERY_CONTINUE) {
rpcMsg.persistHandle = 1;
}
assert(pInfo->fp != NULL);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
@ -204,6 +199,3 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
tstrncpy(s.name, name, tListLen(s.name));
return s;
}

View File

@ -271,6 +271,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);

View File

@ -57,6 +57,11 @@ SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b);
SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b);
void syncUtilMsgHtoN(void* msg);
void syncUtilMsgNtoH(void* msg);
bool syncUtilIsData(tmsg_t msgType);
bool syncUtilUserPreCommit(tmsg_t msgType);
bool syncUtilUserCommit(tmsg_t msgType);
bool syncUtilUserRollback(tmsg_t msgType);
#ifdef __cplusplus
}

View File

@ -19,6 +19,7 @@
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "syncRaftCfg.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
@ -199,7 +200,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
//if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
@ -227,7 +229,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
@ -258,7 +261,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
@ -320,7 +324,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
//if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
@ -330,6 +335,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
syncNodeUpdateConfig(ths, &newSyncCfg);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}

View File

@ -19,6 +19,7 @@
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncRaftCfg.h"
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
@ -101,7 +102,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
//if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
@ -111,6 +113,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
}
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}

View File

@ -116,6 +116,15 @@ void syncStop(int64_t rid) {
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
int32_t ret = 0;
char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
rpcMsg.noResp = 1;
rpcMsg.contLen = strlen(configChange) + 1;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
taosMemoryFree(configChange);
ret = syncPropose(rid, &rpcMsg, false);
return ret;
}
@ -849,6 +858,35 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s;
}
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) {
pSyncNode->pRaftCfg->cfg = *newConfig;
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
ASSERT(ret == 0);
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
int j = 0;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
j++;
}
}
for (int i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
}
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
}
}
SSyncNode* syncNodeAcquire(int64_t rid) {
SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
if (pNode == NULL) {
@ -1207,7 +1245,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
@ -1228,7 +1267,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;

View File

@ -213,3 +213,31 @@ void syncUtilMsgNtoH(void* msg) {
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
}
bool syncUtilIsData(tmsg_t msgType) {
if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) {
return false;
}
return true;
}
bool syncUtilUserPreCommit(tmsg_t msgType) {
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
return true;
}
return false;
}
bool syncUtilUserCommit(tmsg_t msgType) {
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
return true;
}
return false;
}
bool syncUtilUserRollback(tmsg_t msgType) {
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
return true;
}
return false;
}

View File

@ -65,11 +65,18 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("failed to seek idx file, ver %ld, pos: %ld, since %s", ver, offset, terrstr());
return -1;
}
SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("failed to read idx file, since %s", terrstr());
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
wError("read idx file incompletely, read bytes %d, bytes should be %lu", ret, sizeof(SWalIdxEntry));
}
return -1;
}
@ -77,6 +84,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("failed to seek log file, ver %ld, pos: %ld, since %s", ver, entry.offset, terrstr());
return -1;
}
return ret;
@ -92,6 +100,8 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pLogTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
terrno = TSDB_CODE_WAL_INVALID_VER;
wError("cannot open file %s, since %s", fnameStr, terrstr());
return -1;
}
@ -99,6 +109,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pIdxTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("cannot open file %s, since %s", fnameStr, terrstr());
return -1;
}
@ -113,6 +124,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
}
@ -125,12 +137,13 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
if (pRead->curFileFirstVer != pRet->firstVer) {
// error code set inner
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
return -1;
}
}
// code set inner
// error code set inner
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
return -1;
}
@ -155,9 +168,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
if (code < 0) return -1;
}
if (!taosValidFile(pRead->pReadLogTFile)) {
return -1;
}
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
@ -250,15 +261,12 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
// TODO: check wal life
if (pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
terrno = TSDB_CODE_WAL_INVALID_VER;
wError("unexpected wal log version: % " PRId64 ", since seek error", ver);
wError("unexpected wal log version: % " PRId64 ", since %s", ver, terrstr());
return -1;
}
}
/*if (!taosValidFile(pRead->pReadLogTFile)) {*/
/*return -1;*/
/*}*/
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {

View File

@ -454,6 +454,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
//planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "planner internal error")
//udf
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_READ_ERR, "udf pipe read error")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect error")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input")
//schemaless
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")

View File

@ -150,6 +150,9 @@ class TDSql:
raise Exception(repr(e))
return (self.queryRows, timeout)
def getRows(self):
return self.queryRows
def checkRows(self, expectRows):
if self.queryRows == expectRows:
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows))

View File

@ -346,8 +346,10 @@ class TDTestCase:
return
def test_case3(self):
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000)
# self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)
# self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)
self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*1000)
# self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
# self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)

View File

@ -29,8 +29,8 @@
"batch_create_tbl_num": 50000,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 0,
"interlace_rows": 0,
"insert_rows": 10,
"interlace_rows": 100000,
"insert_interval": 0,
"max_sql_len": 10000000,
"disorder_ratio": 0,

View File

@ -93,104 +93,124 @@ class TDTestCase:
res = tdSql.query(query_sql.replace('*', 'last(*)'), True)
return int(res[0][-4])
def queryTsCol(self, tb_name):
def queryTsCol(self, tb_name, check_elm=None):
select_elm = "*" if check_elm is None else check_elm
# ts and ts
query_sql = f'select * from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"'
query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"'
tdSql.query(query_sql)
tdSql.checkRows(11)
tdSql.checkEqual(self.queryLastC10(query_sql), 11)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and ts <= "2021-01-13 12:00:00"'
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and ts <= "2021-01-13 12:00:00"'
tdSql.query(query_sql)
# tdSql.checkRows(2)
# tdSql.checkEqual(self.queryLastC10(query_sql), 6)
tdSql.checkRows(2)
tdSql.checkEqual(self.queryLastC10(query_sql), 6) if select_elm == "*" else False
## ts or and tinyint col
query_sql = f'select * from {tb_name} where ts > "2021-01-11 12:00:00" or c1 = 2'
tdSql.error(query_sql)
query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or c1 = 2'
tdSql.query(query_sql)
tdSql.checkRows(7)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c1 != 2'
query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c1 != 2'
tdSql.query(query_sql)
tdSql.checkRows(4)
tdSql.checkEqual(self.queryLastC10(query_sql), 5)
tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False
## ts or and smallint col
query_sql = f'select * from {tb_name} where ts <> "2021-01-11 12:00:00" or c2 = 10'
tdSql.error(query_sql)
query_sql = f'select {select_elm} from {tb_name} where ts <> "2021-01-11 12:00:00" or c2 = 10'
tdSql.query(query_sql)
tdSql.checkRows(10)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c2 <= 1'
query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c2 <= 1'
tdSql.query(query_sql)
tdSql.checkRows(1)
tdSql.checkEqual(self.queryLastC10(query_sql), 1)
tdSql.checkEqual(self.queryLastC10(query_sql), 1) if select_elm == "*" else False
## ts or and int col
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" or c3 = 4'
tdSql.error(query_sql)
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" or c3 = 4'
tdSql.query(query_sql)
tdSql.checkRows(8)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c3 = 4'
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c3 = 4'
tdSql.query(query_sql)
tdSql.checkRows(1)
tdSql.checkEqual(self.queryLastC10(query_sql), 4)
tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False
## ts or and big col
query_sql = f'select * from {tb_name} where ts is Null or c4 = 5'
tdSql.error(query_sql)
query_sql = f'select * from {tb_name} where ts is not Null and c4 = 2'
query_sql = f'select {select_elm} from {tb_name} where ts is Null or c4 = 5'
tdSql.query(query_sql)
tdSql.checkRows(1)
tdSql.checkEqual(self.queryLastC10(query_sql), 3)
tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False
query_sql = f'select {select_elm} from {tb_name} where ts is not Null and c4 = 2'
tdSql.query(query_sql)
tdSql.checkRows(1)
tdSql.checkEqual(self.queryLastC10(query_sql), 3) if select_elm == "*" else False
## ts or and float col
query_sql = f'select * from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c5 = 6.6'
tdSql.error(query_sql)
query_sql = f'select {select_elm} from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c5 = 6.6'
tdSql.query(query_sql)
tdSql.checkRows(5)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c5 = 1.1'
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c5 = 1.1'
tdSql.query(query_sql)
tdSql.checkRows(4)
tdSql.checkEqual(self.queryLastC10(query_sql), 4)
tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False
## ts or and double col
query_sql = f'select * from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c6 = 7.7'
tdSql.error(query_sql)
query_sql = f'select {select_elm} from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c6 = 7.7'
tdSql.query(query_sql)
tdSql.checkRows(5)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c6 = 1.1'
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c6 = 1.1'
tdSql.query(query_sql)
tdSql.checkRows(4)
tdSql.checkEqual(self.queryLastC10(query_sql), 4)
tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False
## ts or and binary col
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c7 like "binary_"'
tdSql.error(query_sql)
query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c7 in ("binary")'
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c7 like "binary_"'
tdSql.query(query_sql)
tdSql.checkRows(5)
tdSql.checkEqual(self.queryLastC10(query_sql), 5)
tdSql.checkEqual(self.queryLastC10(query_sql), 8) if select_elm == "*" else False
query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c7 in ("binary")'
tdSql.query(query_sql)
tdSql.checkRows(5)
tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False
## ts or and nchar col
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c8 like "nchar%"'
tdSql.error(query_sql)
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c8 like "nchar%"'
tdSql.query(query_sql)
tdSql.checkRows(10)
tdSql.checkEqual(self.queryLastC10(query_sql), 10) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and c8 is Null'
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and c8 is Null'
tdSql.query(query_sql)
tdSql.checkRows(1)
tdSql.checkEqual(self.queryLastC10(query_sql), 11)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
## ts or and bool col
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c9=false'
tdSql.error(query_sql)
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c9=false'
tdSql.query(query_sql)
tdSql.checkRows(6)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and c9=true'
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and c9=true'
tdSql.query(query_sql)
tdSql.checkRows(5)
tdSql.checkEqual(self.queryLastC10(query_sql), 9)
tdSql.checkEqual(self.queryLastC10(query_sql), 9) if select_elm == "*" else False
## multi cols
query_sql = f'select * from {tb_name} where ts > "2021-01-03 12:00:00" and c1 != 2 and c2 >= 2 and c3 <> 4 and c4 < 4 and c5 > 1 and c6 >= 1.1 and c7 is not Null and c8 = "nchar" and c9=false'
query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-03 12:00:00" and c1 != 2 and c2 >= 2 and c3 <> 4 and c4 < 4 and c5 > 1 and c6 >= 1.1 and c7 is not Null and c8 = "nchar" and c9=false'
tdSql.query(query_sql)
tdSql.checkRows(1)
tdSql.checkEqual(self.queryLastC10(query_sql), 10)
tdSql.checkEqual(self.queryLastC10(query_sql), 10) if select_elm == "*" else False
def queryTsTag(self, tb_name):
## ts and tinyint col
@ -2029,12 +2049,12 @@ class TDTestCase:
tb_name = self.initStb()
self.queryFullTagType(tb_name)
def checkTbTsCol(self):
def checkTbTsCol(self, check_elm):
'''
Ordinary table ts and col check
'''
tb_name = self.initTb()
self.queryTsCol(tb_name)
self.queryTsCol(tb_name, check_elm)
def checkStbTsTol(self):
tb_name = self.initStb()
@ -2112,8 +2132,8 @@ class TDTestCase:
for check_elm in [None, column_name]:
self.checkTbColTypeOperator(check_elm)
self.checkStbColTypeOperator(check_elm)
self.checkTbTsCol(check_elm)
# self.checkStbTagTypeOperator()
# self.checkTbTsCol()
# self.checkStbTsTol()
# self.checkStbTsTag()
# self.checkStbTsColTag()

View File

@ -13,14 +13,12 @@ from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
print ("===================: ", updatecfgDict)
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
@ -43,27 +41,35 @@ class TDTestCase:
break
return buildPath
def create_tables(self,dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tdSql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tdSql.execute("use %s" %dbName)
tdSql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
def newcur(self,cfg,host,port):
user = "root"
password = "taosdata"
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
if (i > 0) and (i%100 == 0):
tdSql.execute(sql)
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tdSql.execute(sql)
tsql.execute(sql)
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return
def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,startTs):
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tdSql.execute("use %s" %dbName)
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
@ -72,30 +78,37 @@ class TDTestCase:
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and (j%2000 == 0):
tdSql.execute(sql)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
# print(sql)
print("sql:%s"%sql)
tdSql.execute(sql)
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
print ("input parameters:")
print (parameterDict)
self.create_tables(parameterDict["dbName"],\
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
self.create_tables(tsql,\
parameterDict["dbName"],\
parameterDict["vgroups"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"])
self.insert_data(parameterDict["dbName"],\
self.insert_data(tsql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\
parameterDict["startTs"])
return
@ -113,18 +126,115 @@ class TDTestCase:
tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'dbName': 'db', \
parameterDict = {'cfg': '', \
'dbName': 'db', \
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10, \
'rowsPerTbl': 10000, \
'batchNum': 10, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
time.sleep(2)
# wait stb ready
while 1:
#tdSql.query("show %s.stables"%parameterDict['dbName'])
tdSql.query("show db.stables")
#print (self.queryResult)
#print (tdSql.getRows())
if tdSql.getRows() == 1:
break
else:
time.sleep(1)
tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column'
topicFromCtb = 'topic_ctb_column'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics")
print ("======================================")
#print (self.queryResult)
#tdSql.checkRows(2)
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
print (topic1)
print (topic2)
print (topicFromStb)
print (topicFromCtb)
#tdLog.info("show topics: %s, %s"%topic1, topic2)
#if topic1 != topicFromStb or topic1 != topicFromCtb:
# tdLog.exit("topic error1")
#if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
#tdSql.query("create database %s"%cdbName)
#tdSql.query("use %s"%cdbName)
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdLog.printNoPrefix("======== test scenario 2: ")

View File

@ -0,0 +1,372 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
#tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def newcur(self,cfg,host,port):
user = "root"
password = "taosdata"
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
print ("input parameters:")
print (parameterDict)
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
self.create_tables(tsql,\
parameterDict["dbName"],\
parameterDict["vgroups"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"])
self.insert_data(tsql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\
parameterDict["startTs"])
return
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db', \
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 10, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
time.sleep(2)
# wait stb ready
while 1:
tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1:
break
else:
time.sleep(1)
tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column'
topicFromCtb = 'topic_ctb_column'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics")
#tdSql.checkRows(2)
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
print (topic1)
print (topic2)
print (topicFromStb)
print (topicFromCtb)
#tdLog.info("show topics: %s, %s"%topic1, topic2)
#if topic1 != topicFromStb or topic1 != topicFromCtb:
# tdLog.exit("topic error1")
#if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
# ==============================================================================
tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ")
tdLog.info(" clean database")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
# wait db ready
while 1:
tdSql.query("show databases")
if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break
else:
time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready
while 1:
tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1:
break
else:
time.sleep(1)
tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column2'
topicFromCtb = 'topic_ctb_column2'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
print (topic1)
print (topic2)
print (topicFromStb)
print (topicFromCtb)
#tdLog.info("show topics: %s, %s"%topic1, topic2)
#if topic1 != topicFromStb or topic1 != topicFromCtb:
# tdLog.exit("topic error1")
#if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# create new child table and insert data
newCtbName = 'newctb'
rowsOfNewCtb = 1000
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
tdSql.execute(sql)
tdLog.debug("insert data into new child table ............ [OK]")
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
expectmsgcnt += rowsOfNewCtb
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
# ==============================================================================
tdLog.printNoPrefix("======== test scenario 3: ")
#os.system('pkill tmq_sim')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -51,3 +51,7 @@ python3 ./test.py -f 2-query/arcsin.py
python3 ./test.py -f 2-query/arccos.py
python3 ./test.py -f 2-query/arctan.py
# python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 7-tmq/basic5.py

View File

@ -37,6 +37,10 @@ typedef struct {
TdThread thread;
int32_t consumerId;
int32_t autoCommitIntervalMs; // 1000 ms
char autoCommit[8]; // true, false
char autoOffsetRest[16]; // none, earliest, latest
int32_t ifCheckData;
int64_t expectMsgCnt;
@ -120,6 +124,9 @@ void saveConfigToLogFile() {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, " Topics: ");
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
@ -251,7 +258,7 @@ void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
//tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
@ -272,6 +279,9 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "auto.offset.reset", "latest");
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return;
}
@ -324,9 +334,11 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n");
break;
}
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
break;
}
}
@ -355,6 +367,9 @@ void* consumeThreadFunc(void* param) {
exit(-1);
}
tmq_list_destroy(pInfo->topicList);
pInfo->topicList = NULL;
loop_consume(pInfo);
tmq_commit(pInfo->tmq, NULL, 0);
@ -442,6 +457,11 @@ int32_t getConsumeInfo() {
while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes);
// set default value
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {
continue;
@ -457,6 +477,12 @@ int32_t getConsumeInfo() {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
} else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]);
} else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]);
} else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]);
}
}
numOfThread++;