Merge branch '3.0' of github.com:taosdata/TDengine into szhou/python-udf
This commit is contained in:
commit
9df6609448
|
@ -845,6 +845,7 @@ typedef struct {
|
|||
int8_t cacheLast;
|
||||
int8_t replications;
|
||||
int32_t sstTrigger;
|
||||
int32_t minRows;
|
||||
} SAlterDbReq;
|
||||
|
||||
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
||||
|
@ -1316,6 +1317,9 @@ typedef struct {
|
|||
int8_t strict;
|
||||
int8_t cacheLast;
|
||||
int64_t reserved[8];
|
||||
// 1st modification
|
||||
int16_t sttTrigger;
|
||||
int32_t minRows;
|
||||
} SAlterVnodeConfigReq;
|
||||
|
||||
int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
|
||||
|
|
|
@ -240,6 +240,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TOO_MANY_USERS TAOS_DEF_ERROR_CODE(0, 0x0355)
|
||||
#define TSDB_CODE_MND_INVALID_ALTER_OPER TAOS_DEF_ERROR_CODE(0, 0x0356)
|
||||
#define TSDB_CODE_MND_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0357)
|
||||
#define TSDB_CODE_MND_USER_NOT_AVAILABLE TAOS_DEF_ERROR_CODE(0, 0x0358)
|
||||
|
||||
// mnode-stable-part1
|
||||
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
|
||||
|
|
|
@ -1216,6 +1216,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
|||
static int32_t smlInsertData(SSmlHandle *info) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if(info->pRequest->dbList == NULL){
|
||||
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
void* data = taosArrayReserve(info->pRequest->dbList, 1);
|
||||
memcpy(data, info->pRequest->pDb, TSDB_DB_FNAME_LEN > strlen(info->pRequest->pDb) ? strlen(info->pRequest->pDb) : TSDB_DB_FNAME_LEN);
|
||||
|
||||
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
|
||||
while (oneTable) {
|
||||
SSmlTableInfo *tableData = *oneTable;
|
||||
|
@ -1224,6 +1230,11 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
|||
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
|
||||
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
|
||||
|
||||
if(info->pRequest->tableList == NULL){
|
||||
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
|
||||
}
|
||||
taosArrayPush(info->pRequest->tableList, &pName);
|
||||
|
||||
SRequestConnInfo conn = {0};
|
||||
conn.pTrans = info->taos->pAppInfo->pTransporter;
|
||||
conn.requestId = info->pRequest->requestId;
|
||||
|
@ -1425,6 +1436,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
|
|||
do {
|
||||
code = smlModifyDBSchemas(info);
|
||||
if (code == 0) break;
|
||||
taosMsleep(200);
|
||||
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -1449,62 +1461,75 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
|||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
SRequestObj *request = NULL;
|
||||
SSmlHandle *info = NULL;
|
||||
while(1){
|
||||
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
||||
if (request == NULL) {
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
||||
if (request == NULL) {
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
return NULL;
|
||||
info = smlBuildSmlInfo(taos);
|
||||
if (info == NULL) {
|
||||
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
uError("SML:taos_schemaless_insert error SSmlHandle is null");
|
||||
return (TAOS_RES *)request;
|
||||
}
|
||||
info->pRequest = request;
|
||||
info->isRawLine = rawLine != NULL;
|
||||
info->ttl = ttl;
|
||||
info->precision = precision;
|
||||
info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
|
||||
info->msgBuf.buf = info->pRequest->msgBuf;
|
||||
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||
info->lineNum = numLines;
|
||||
|
||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||
if (request->pDb == NULL) {
|
||||
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
|
||||
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
||||
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (protocol == TSDB_SML_LINE_PROTOCOL &&
|
||||
(precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
|
||||
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
|
||||
smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||
numLines = 1;
|
||||
} else if (numLines <= 0) {
|
||||
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||
smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
|
||||
request->code = code;
|
||||
info->cost.endTime = taosGetTimestampUs();
|
||||
info->cost.code = code;
|
||||
smlPrintStatisticInfo(info);
|
||||
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){
|
||||
refreshMeta(request->pTscObj, request);
|
||||
uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code);
|
||||
smlDestroyInfo(info);
|
||||
info = NULL;
|
||||
taos_free_result(request);
|
||||
request = NULL;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
SSmlHandle *info = smlBuildSmlInfo(taos);
|
||||
if (info == NULL) {
|
||||
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
uError("SML:taos_schemaless_insert error SSmlHandle is null");
|
||||
return (TAOS_RES *)request;
|
||||
}
|
||||
info->pRequest = request;
|
||||
info->isRawLine = rawLine != NULL;
|
||||
info->ttl = ttl;
|
||||
info->precision = precision;
|
||||
info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
|
||||
info->msgBuf.buf = info->pRequest->msgBuf;
|
||||
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||
info->lineNum = numLines;
|
||||
|
||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||
if (request->pDb == NULL) {
|
||||
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
|
||||
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
||||
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (protocol == TSDB_SML_LINE_PROTOCOL &&
|
||||
(precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
|
||||
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
|
||||
smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||
numLines = 1;
|
||||
} else if (numLines <= 0) {
|
||||
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||
smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
|
||||
request->code = code;
|
||||
info->cost.endTime = taosGetTimestampUs();
|
||||
info->cost.code = code;
|
||||
smlPrintStatisticInfo(info);
|
||||
|
||||
end:
|
||||
smlDestroyInfo(info);
|
||||
return (TAOS_RES *)request;
|
||||
|
|
|
@ -436,7 +436,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
|||
// bind data
|
||||
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
|
||||
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
|
||||
uError("smlBuildCol error, retry");
|
||||
uDebug("smlBuildCol error, retry");
|
||||
info->dataFormat = false;
|
||||
info->reRun = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -582,8 +582,10 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
|||
.i = ts,
|
||||
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
|
||||
if (info->dataFormat) {
|
||||
smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
||||
smlBuildRow(info->currTableDataCtx);
|
||||
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
||||
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
||||
ret = smlBuildRow(info->currTableDataCtx);
|
||||
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
||||
clearColValArray(info->currTableDataCtx->pValues);
|
||||
} else {
|
||||
taosArraySet(elements->colArray, 0, &kv);
|
||||
|
|
|
@ -2044,8 +2044,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
// len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
|
||||
// if (len >= size - 1) return dumpBuf;
|
||||
len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
|
||||
|
|
|
@ -2219,6 +2219,10 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->sstTrigger) < 0) return -1;
|
||||
|
||||
// 1st modification
|
||||
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -2246,6 +2250,13 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->sstTrigger) < 0) return -1;
|
||||
|
||||
// 1st modification
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI32(&decoder, &pReq->minRows) < 0) return -1;
|
||||
} else {
|
||||
pReq->minRows = -1;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4172,6 +4183,11 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon
|
|||
for (int32_t i = 0; i < 8; ++i) {
|
||||
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||
}
|
||||
|
||||
// 1st modification
|
||||
if (tEncodeI16(&encoder, pReq->sttTrigger) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -4201,6 +4217,15 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC
|
|||
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||
}
|
||||
|
||||
// 1st modification
|
||||
if (tDecodeIsEnd(&decoder)) {
|
||||
pReq->sttTrigger = -1;
|
||||
pReq->minRows = -1;
|
||||
} else {
|
||||
if (tDecodeI16(&decoder, &pReq->sttTrigger) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->minRows) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
|
|
@ -124,7 +124,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
|||
vmFreeQueue(pMgmt, pVnode);
|
||||
|
||||
if (commitAndRemoveWal) {
|
||||
dInfo("vgId:%d, commit data", pVnode->vgId);
|
||||
dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
|
||||
vnodeSyncCommit(pVnode->pImpl);
|
||||
vnodeBegin(pVnode->pImpl);
|
||||
dInfo("vgId:%d, commit data finished", pVnode->vgId);
|
||||
|
|
|
@ -543,7 +543,7 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
|
|||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
char* qmsg; //SubPlanToString
|
||||
char* qmsg; // SubPlanToString
|
||||
SEpSet epSet;
|
||||
} SMqVgEp;
|
||||
|
||||
|
|
|
@ -725,6 +725,18 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
|||
terrno = 0;
|
||||
}
|
||||
|
||||
if (pAlter->sstTrigger > 0 && pAlter->sstTrigger != pDb->cfg.sstTrigger) {
|
||||
pDb->cfg.sstTrigger = pAlter->sstTrigger;
|
||||
pDb->vgVersion++;
|
||||
terrno = 0;
|
||||
}
|
||||
|
||||
if (pAlter->minRows > 0 && pAlter->minRows != pDb->cfg.minRows) {
|
||||
pDb->cfg.minRows = pAlter->minRows;
|
||||
pDb->vgVersion++;
|
||||
terrno = 0;
|
||||
}
|
||||
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -337,7 +337,11 @@ SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
SUserObj *pUser = sdbAcquire(pSdb, SDB_USER, userName);
|
||||
if (pUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_USER_NOT_EXIST;
|
||||
if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_USER_NOT_EXIST;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_USER_NOT_AVAILABLE;
|
||||
}
|
||||
}
|
||||
return pUser;
|
||||
}
|
||||
|
|
|
@ -319,6 +319,8 @@ static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pV
|
|||
alterReq.walLevel = pDb->cfg.walLevel;
|
||||
alterReq.strict = pDb->cfg.strict;
|
||||
alterReq.cacheLast = pDb->cfg.cacheLast;
|
||||
alterReq.sttTrigger = pDb->cfg.sstTrigger;
|
||||
alterReq.minRows = pDb->cfg.minRows;
|
||||
|
||||
mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
|
||||
int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
|
||||
|
|
|
@ -780,6 +780,7 @@ typedef struct SCacheRowsReader {
|
|||
SDataFReader *pDataFReader;
|
||||
SDataFReader *pDataFReaderLast;
|
||||
const char *idstr;
|
||||
int64_t lastTs;
|
||||
} SCacheRowsReader;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -983,11 +983,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
pTask->tbSink.vnode = pTq->pVnode;
|
||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
||||
|
||||
/*A(pTask->tbSink.pSchemaWrapper);*/
|
||||
/*A(pTask->tbSink.pSchemaWrapper->pSchema);*/
|
||||
int32_t version = 1;
|
||||
SMetaInfo info = {0};
|
||||
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
version = info.skmVer;
|
||||
}
|
||||
|
||||
pTask->tbSink.pTSchema =
|
||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
|
||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version);
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
}
|
||||
|
||||
|
|
|
@ -506,6 +506,11 @@ static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelR
|
|||
SArray *aDelData) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pDelIdx) {
|
||||
code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (pMem) {
|
||||
code = getTableDelDataFromTbData(pMem, aDelData);
|
||||
if (code) goto _err;
|
||||
|
@ -516,11 +521,6 @@ static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelR
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (pDelIdx) {
|
||||
code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
@ -593,9 +593,10 @@ typedef struct {
|
|||
SMergeTree mergeTree;
|
||||
SMergeTree *pMergeTree;
|
||||
SSttBlockLoadInfo *pLoadInfo;
|
||||
int64_t lastTs;
|
||||
} SFSLastNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -641,15 +642,27 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
|||
}
|
||||
state->state = SFSLASTNEXTROW_BLOCKROW;
|
||||
}
|
||||
case SFSLASTNEXTROW_BLOCKROW:
|
||||
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||
*ppRow = &state->row;
|
||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||
case SFSLASTNEXTROW_BLOCKROW: {
|
||||
bool hasVal = false;
|
||||
do {
|
||||
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||
*ppRow = &state->row;
|
||||
hasVal = tMergeTreeNext(&state->mergeTree);
|
||||
} while (TSDBROW_TS(&state->row) <= state->lastTs && hasVal);
|
||||
|
||||
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
||||
*pIgnoreEarlierTs = true;
|
||||
state->state = SFSLASTNEXTROW_FILESET;
|
||||
goto _next_fileset;
|
||||
}
|
||||
|
||||
*pIgnoreEarlierTs = false;
|
||||
if (!hasVal) {
|
||||
state->state = SFSLASTNEXTROW_FILESET;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
|
@ -722,9 +735,10 @@ typedef struct SFSNextRowIter {
|
|||
int32_t iRow;
|
||||
TSDBROW row;
|
||||
SSttBlockLoadInfo *pLoadInfo;
|
||||
int64_t lastTs;
|
||||
} SFSNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -816,12 +830,14 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
SDataBlk block = {0};
|
||||
|
||||
tDataBlkReset(&block);
|
||||
// tBlockDataReset(&state->blockData);
|
||||
tBlockDataReset(state->pBlockData);
|
||||
|
||||
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
|
||||
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL);
|
||||
*/
|
||||
if (block.maxKey.ts <= state->lastTs) {
|
||||
*pIgnoreEarlierTs = true;
|
||||
goto _next_fileset;
|
||||
}
|
||||
*pIgnoreEarlierTs = false;
|
||||
tBlockDataReset(state->pBlockData);
|
||||
TABLEID tid = {.suid = state->suid, .uid = state->uid};
|
||||
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
|
||||
|
@ -931,16 +947,23 @@ typedef struct SMemNextRowIter {
|
|||
SMEMNEXTROWSTATES state;
|
||||
STbData *pMem; // [input]
|
||||
STbDataIter iter; // mem buffer skip list iterator
|
||||
int64_t lastTs;
|
||||
// bool iterOpened;
|
||||
// TSDBROW *curRow;
|
||||
} SMemNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) {
|
||||
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
SMemNextRowIter *state = (SMemNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
*pIgnoreEarlierTs = false;
|
||||
switch (state->state) {
|
||||
case SMEMNEXTROW_ENTER: {
|
||||
if (state->pMem != NULL) {
|
||||
if (state->pMem->maxKey <= state->lastTs) {
|
||||
*ppRow = NULL;
|
||||
*pIgnoreEarlierTs = true;
|
||||
return code;
|
||||
}
|
||||
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
|
||||
|
||||
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
|
||||
|
@ -1041,13 +1064,14 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
|
|||
return deleted;
|
||||
}
|
||||
|
||||
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
|
||||
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs);
|
||||
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
|
||||
|
||||
typedef struct {
|
||||
TSDBROW *pRow;
|
||||
bool stop;
|
||||
bool next;
|
||||
bool ignoreEarlierTs;
|
||||
void *iter;
|
||||
_next_row_fn_t nextRowFn;
|
||||
_next_row_clear_fn_t nextRowClearFn;
|
||||
|
@ -1070,7 +1094,7 @@ typedef struct {
|
|||
|
||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
||||
SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
|
||||
SDataFReader **pDataFReaderLast) {
|
||||
SDataFReader **pDataFReaderLast, int64_t lastTs) {
|
||||
int code = 0;
|
||||
|
||||
STbData *pMem = NULL;
|
||||
|
@ -1131,6 +1155,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
pIter->fsLastState.uid = uid;
|
||||
pIter->fsLastState.pLoadInfo = pLoadInfo;
|
||||
pIter->fsLastState.pDataFReader = pDataFReaderLast;
|
||||
pIter->fsLastState.lastTs = lastTs;
|
||||
|
||||
pIter->fsState.state = SFSNEXTROW_FS;
|
||||
pIter->fsState.pTsdb = pTsdb;
|
||||
|
@ -1141,17 +1166,19 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
pIter->fsState.uid = uid;
|
||||
pIter->fsState.pLoadInfo = pLoadInfo;
|
||||
pIter->fsState.pDataFReader = pDataFReader;
|
||||
pIter->fsState.lastTs = lastTs;
|
||||
|
||||
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
|
||||
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
|
||||
pIter->input[2] = (TsdbNextRowState){&pIter->fsLastRow, false, true, &pIter->fsLastState, getNextRowFromFSLast,
|
||||
clearNextRowFromFSLast};
|
||||
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
|
||||
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
|
||||
pIter->input[2] = (TsdbNextRowState){
|
||||
&pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast};
|
||||
pIter->input[3] =
|
||||
(TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
||||
(TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
||||
|
||||
if (pMem) {
|
||||
pIter->memState.pMem = pMem;
|
||||
pIter->memState.state = SMEMNEXTROW_ENTER;
|
||||
pIter->memState.lastTs = lastTs;
|
||||
pIter->input[0].stop = false;
|
||||
pIter->input[0].next = true;
|
||||
}
|
||||
|
@ -1159,6 +1186,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
if (pIMem) {
|
||||
pIter->imemState.pMem = pIMem;
|
||||
pIter->imemState.state = SMEMNEXTROW_ENTER;
|
||||
pIter->imemState.lastTs = lastTs;
|
||||
pIter->input[1].stop = false;
|
||||
pIter->input[1].next = true;
|
||||
}
|
||||
|
@ -1186,12 +1214,12 @@ _err:
|
|||
}
|
||||
|
||||
// iterate next row non deleted backward ts, version (from high to low)
|
||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
|
||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
int code = 0;
|
||||
for (;;) {
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pIter->input[i].pRow == NULL) {
|
||||
|
@ -1203,6 +1231,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
|
|||
|
||||
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
|
||||
*ppRow = NULL;
|
||||
*pIgnoreEarlierTs = (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs ||
|
||||
pIter->input[2].ignoreEarlierTs || pIter->input[3].ignoreEarlierTs);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1303,6 +1333,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
bool hasRow = false;
|
||||
bool ignoreEarlierTs = false;
|
||||
SArray *pColArray = NULL;
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
|
@ -1315,11 +1346,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast);
|
||||
&pr->pDataFReaderLast, pr->lastTs);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow);
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs);
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
|
@ -1419,7 +1450,12 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
// taosArrayDestroy(pColArray);
|
||||
//} else {
|
||||
if (!hasRow) {
|
||||
taosArrayClear(pColArray);
|
||||
if (ignoreEarlierTs) {
|
||||
taosArrayDestroy(pColArray);
|
||||
pColArray = NULL;
|
||||
} else {
|
||||
taosArrayClear(pColArray);
|
||||
}
|
||||
}
|
||||
*ppColArray = pColArray;
|
||||
//}
|
||||
|
@ -1441,6 +1477,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
bool hasRow = false;
|
||||
bool ignoreEarlierTs = false;
|
||||
SArray *pColArray = NULL;
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
|
@ -1453,11 +1490,11 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast);
|
||||
&pr->pDataFReaderLast, pr->lastTs);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow);
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs);
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
|
@ -1557,7 +1594,12 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
// taosArrayDestroy(pColArray);
|
||||
//} else {
|
||||
if (!hasRow) {
|
||||
taosArrayClear(pColArray);
|
||||
if (ignoreEarlierTs) {
|
||||
taosArrayDestroy(pColArray);
|
||||
pColArray = NULL;
|
||||
} else {
|
||||
taosArrayClear(pColArray);
|
||||
}
|
||||
}
|
||||
*ppLastArray = pColArray;
|
||||
//}
|
||||
|
@ -1591,8 +1633,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
|
|||
SArray *pArray = NULL;
|
||||
bool dup = false; // which is always false for now
|
||||
code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr);
|
||||
// if table's empty or error, set handle NULL and return
|
||||
if (code < 0 /* || pArray == NULL*/) {
|
||||
// if table's empty or error or ignore ignore earlier ts, set handle NULL and return
|
||||
if (code < 0 || pArray == NULL) {
|
||||
if (!dup && pArray) {
|
||||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
@ -1635,8 +1677,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
|
|||
if (!h) {
|
||||
SArray *pLastArray = NULL;
|
||||
code = mergeLast(uid, pTsdb, &pLastArray, pr);
|
||||
// if table's empty or error, set handle NULL and return
|
||||
if (code < 0 /* || pLastArray == NULL*/) {
|
||||
// if table's empty or error or ignore ignore earlier ts, set handle NULL and return
|
||||
if (code < 0 || pLastArray == NULL) {
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
*handle = NULL;
|
||||
|
|
|
@ -199,6 +199,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
p->idstr = taosStrdup(idstr);
|
||||
taosThreadMutexInit(&p->readerMutex, NULL);
|
||||
|
||||
p->lastTs = INT64_MIN;
|
||||
|
||||
*pReader = p;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -347,6 +349,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
{
|
||||
bool hasNotNullRow = true;
|
||||
int64_t minTs = INT64_MAX;
|
||||
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
||||
int32_t slotId = slotIds[k];
|
||||
|
||||
|
@ -357,6 +361,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
hasRes = true;
|
||||
p->ts = pCol->ts;
|
||||
p->colVal = pCol->colVal;
|
||||
minTs = pCol->ts;
|
||||
|
||||
// only set value for last row query
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
|
||||
|
@ -373,11 +378,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
if (pColVal->ts > p->ts) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
if (!COL_VAL_IS_VALUE(&p->colVal)) {
|
||||
hasNotNullRow = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
hasRes = true;
|
||||
p->ts = pColVal->ts;
|
||||
if (pColVal->ts < minTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
minTs = pColVal->ts;
|
||||
}
|
||||
|
||||
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal = pColVal->colVal;
|
||||
|
@ -394,6 +405,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasNotNullRow) {
|
||||
pr->lastTs = minTs;
|
||||
}
|
||||
}
|
||||
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
|
|
|
@ -198,7 +198,7 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
ASSERT(pReader->bData.nRow);
|
||||
|
||||
int32_t aBufN[5] = {0};
|
||||
code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN);
|
||||
code = tCmprBlockData(&pReader->bData, NO_COMPRESSION, NULL, NULL, pReader->aBuf, aBufN);
|
||||
if (code) goto _exit;
|
||||
|
||||
int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
|
||||
|
@ -276,7 +276,7 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** pp
|
|||
code = tsdbSnapReadNextRow(pReader, &pRowInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pReader->bData.nRow >= 4096) break;
|
||||
if (pReader->bData.nRow >= 81920) break;
|
||||
} while (pRowInfo);
|
||||
|
||||
ASSERT(pReader->bData.nRow > 0);
|
||||
|
|
|
@ -1541,6 +1541,14 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
}
|
||||
}
|
||||
|
||||
if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
|
||||
pVnode->config.sttTrigger = req.sttTrigger;
|
||||
}
|
||||
|
||||
if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {
|
||||
pVnode->config.tsdbCfg.minRows = req.minRows;
|
||||
}
|
||||
|
||||
if (walChanged) {
|
||||
walAlter(pVnode->pWal, &pVnode->config.walCfg);
|
||||
}
|
||||
|
@ -1656,7 +1664,7 @@ _err:
|
|||
}
|
||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SDropIndexReq req = {0};
|
||||
pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
|
||||
pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
pRsp->pCont = NULL;
|
||||
pRsp->contLen = 0;
|
||||
|
@ -1665,6 +1673,7 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p
|
|||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (metaDropIndexFromSTable(pVnode->pMeta, version, &req) < 0) {
|
||||
pRsp->code = terrno;
|
||||
return -1;
|
||||
|
|
|
@ -121,7 +121,8 @@ enum {
|
|||
STREAM_RECOVER_STEP__NONE = 0,
|
||||
STREAM_RECOVER_STEP__PREPARE1,
|
||||
STREAM_RECOVER_STEP__PREPARE2,
|
||||
STREAM_RECOVER_STEP__SCAN,
|
||||
STREAM_RECOVER_STEP__SCAN1,
|
||||
STREAM_RECOVER_STEP__SCAN2,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1110,11 +1110,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||
|
||||
#ifndef NDEBUG
|
||||
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
|
||||
pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
|
||||
qDebug("switch to next table %" PRId64 " ts %" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
|
||||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||
#endif
|
||||
|
||||
bool found = false;
|
||||
for (int32_t i = 0; i < numOfTables; i++) {
|
||||
|
|
|
@ -1608,8 +1608,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
if (pResult && pResult->info.rows > 0) {
|
||||
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
|
||||
pResult->info.window.skey, pResult->info.window.ekey);
|
||||
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
|
||||
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion);
|
||||
pTaskInfo->streamInfo.returned = 1;
|
||||
|
@ -1724,9 +1722,9 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
|||
}
|
||||
}
|
||||
|
||||
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey) {
|
||||
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
|
||||
if (pInfo->pUpdateInfo) {
|
||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||
checkUpdateData(pInfo, true, pBlock, true);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
|
||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||
pInfo->updateResIndex = 0;
|
||||
|
@ -1758,11 +1756,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
||||
qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
|
||||
pTSInfo->base.cond.endVersion);
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
|
||||
} else {
|
||||
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
|
||||
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
|
||||
qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
|
||||
pTSInfo->base.cond.endVersion);
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
||||
}
|
||||
|
||||
/*resetTableScanInfo(pTSInfo, pWin);*/
|
||||
|
@ -1772,11 +1772,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
pTSInfo->scanTimes = 0;
|
||||
pTSInfo->currentGroupId = -1;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
|
||||
pTaskInfo->streamInfo.recoverScanFinished = false;
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
|
||||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
|
||||
if (pInfo->blockRecoverContiCnt > 100) {
|
||||
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
|
||||
pInfo->blockRecoverContiCnt = 0;
|
||||
|
@ -1789,6 +1789,27 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||
return pInfo->pRecoverRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_UPDATERES: {
|
||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
return pInfo->pUpdateRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (pSDB) {
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
|
||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
// printDataBlock(pSDB, "stream scan update");
|
||||
calBlockTbName(pInfo, pSDB);
|
||||
return pSDB;
|
||||
}
|
||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -1798,8 +1819,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pInfo->blockRecoverContiCnt++;
|
||||
calBlockTbName(pInfo, pInfo->pRecoverRes);
|
||||
if (pInfo->pUpdateInfo) {
|
||||
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
||||
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
} else {
|
||||
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
|
||||
}
|
||||
}
|
||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
|
@ -1910,7 +1935,7 @@ FETCH_NEXT_BLOCK:
|
|||
switch (pInfo->scanMode) {
|
||||
case STREAM_SCAN_FROM_RES: {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey);
|
||||
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
pInfo->pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
|
@ -2011,7 +2036,7 @@ FETCH_NEXT_BLOCK:
|
|||
return pInfo->pCreateTbRes;
|
||||
}
|
||||
|
||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey);
|
||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
pInfo->pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
|
|
|
@ -4805,10 +4805,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
|
||||
}
|
||||
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
|
||||
}
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
|
||||
|
|
|
@ -1462,7 +1462,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
|
|||
}
|
||||
|
||||
void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
|
||||
if (NULL == pTarget || NULL == pPos || NULL == pSrc) {
|
||||
if (NULL == pTarget || NULL == pPos || NULL == pSrc || NULL == pSrc->pHead) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -236,6 +236,7 @@ alter_db_option(A) ::= REPLICA NK_INTEGER(B).
|
|||
//alter_db_option(A) ::= STRICT NK_STRING(B). { A.type = DB_OPTION_STRICT; A.val = B; }
|
||||
alter_db_option(A) ::= WAL_LEVEL NK_INTEGER(B). { A.type = DB_OPTION_WAL; A.val = B; }
|
||||
alter_db_option(A) ::= STT_TRIGGER NK_INTEGER(B). { A.type = DB_OPTION_STT_TRIGGER; A.val = B; }
|
||||
alter_db_option(A) ::= MINROWS NK_INTEGER(B). { A.type = DB_OPTION_MINROWS; A.val = B; }
|
||||
|
||||
%type integer_list { SNodeList* }
|
||||
%destructor integer_list { nodesDestroyList($$); }
|
||||
|
|
|
@ -201,6 +201,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
|
|||
SSmlKv* kv = (SSmlKv*)data;
|
||||
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
|
||||
ret = TSDB_CODE_SML_INVALID_DATA;
|
||||
uError("SML smlBuildCol error col not same %s", pColSchema->name);
|
||||
goto end;
|
||||
}
|
||||
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
|
|
|
@ -2500,9 +2500,8 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
STempTableNode* pTempTable = (STempTableNode*)pTable;
|
||||
code = translateSubquery(pCxt, pTempTable->pSubquery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pTempTable->pSubquery) &&
|
||||
((SSelectStmt*)pTempTable->pSubquery)->isEmptyResult &&
|
||||
isSelectStmt(pCxt->pCurrStmt)) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pTempTable->pSubquery) &&
|
||||
((SSelectStmt*)pTempTable->pSubquery)->isEmptyResult && isSelectStmt(pCxt->pCurrStmt)) {
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
|
||||
}
|
||||
|
||||
|
@ -2935,6 +2934,9 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateFillValues(pCxt, pSelect);
|
||||
}
|
||||
if (NULL == pSelect->pProjectionList || 0 >= pSelect->pProjectionList->length) {
|
||||
code = TSDB_CODE_PAR_INVALID_SELECTED_EXPR;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -4251,6 +4253,7 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
|
|||
pReq->cacheLastSize = pStmt->pOptions->cacheLastSize;
|
||||
pReq->replications = pStmt->pOptions->replica;
|
||||
pReq->sstTrigger = pStmt->pOptions->sstTrigger;
|
||||
pReq->minRows = pStmt->pOptions->minRowsPerBlock;
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -5570,7 +5573,8 @@ static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt
|
|||
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) &&
|
||||
if (!pSelect->isDistinct &&
|
||||
(NULL != pSelect->pFromTable && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) &&
|
||||
NULL == pSelect->pGroupByList && NULL == pSelect->pLimit && NULL == pSelect->pSlimit &&
|
||||
NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -108,6 +108,7 @@ TEST_F(ParserInitialATest, alterDnode) {
|
|||
* | REPLICA int_value -- todo: enum 1, 3, default 1, unit replica
|
||||
* | WAL_LEVEL int_value -- enum 1, 2, default 1
|
||||
* | STT_TRIGGER int_value -- rang [1, 16], default 8
|
||||
* | MINROWS int_value -- rang [10, 1000], default 100
|
||||
* }
|
||||
*/
|
||||
TEST_F(ParserInitialATest, alterDatabase) {
|
||||
|
@ -133,6 +134,7 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
expect.cacheLastSize = -1;
|
||||
expect.replications = -1;
|
||||
expect.sstTrigger = -1;
|
||||
expect.minRows = -1;
|
||||
};
|
||||
auto setAlterDbBuffer = [&](int32_t buffer) { expect.buffer = buffer; };
|
||||
auto setAlterDbPageSize = [&](int32_t pageSize) { expect.pageSize = pageSize; };
|
||||
|
@ -150,6 +152,7 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
auto setAlterDbCacheModel = [&](int8_t cacheModel) { expect.cacheLast = cacheModel; };
|
||||
auto setAlterDbReplica = [&](int8_t replications) { expect.replications = replications; };
|
||||
auto setAlterDbSttTrigger = [&](int8_t sstTrigger) { expect.sstTrigger = sstTrigger; };
|
||||
auto setAlterDbMinRows = [&](int32_t minRows) { expect.minRows = minRows; };
|
||||
|
||||
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_DATABASE_STMT);
|
||||
|
@ -170,6 +173,7 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
ASSERT_EQ(req.cacheLast, expect.cacheLast);
|
||||
ASSERT_EQ(req.replications, expect.replications);
|
||||
ASSERT_EQ(req.sstTrigger, expect.sstTrigger);
|
||||
ASSERT_EQ(req.minRows, expect.minRows);
|
||||
});
|
||||
|
||||
const int32_t MINUTE_PER_DAY = MILLISECOND_PER_DAY / MILLISECOND_PER_MINUTE;
|
||||
|
@ -277,6 +281,15 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
setAlterDbSttTrigger(16);
|
||||
run("ALTER DATABASE test STT_TRIGGER 16");
|
||||
clearAlterDbReq();
|
||||
|
||||
initAlterDb("test");
|
||||
setAlterDbMinRows(10);
|
||||
run("ALTER DATABASE test MINROWS 10");
|
||||
setAlterDbMinRows(50);
|
||||
run("ALTER DATABASE test MINROWS 50");
|
||||
setAlterDbMinRows(1000);
|
||||
run("ALTER DATABASE test MINROWS 1000");
|
||||
clearAlterDbReq();
|
||||
}
|
||||
|
||||
TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
|
||||
|
|
|
@ -59,6 +59,8 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara
|
|||
|
||||
pParam->columnData = pColumnData;
|
||||
pParam->colAlloced = true;
|
||||
pParam->numOfRows = numOfRows;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -740,6 +742,10 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
|
|||
SCL_ERR_JRET(code);
|
||||
}
|
||||
|
||||
if (rowNum == 0) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
code = (*ffpSet.process)(params, paramNum, output);
|
||||
if (code) {
|
||||
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
||||
|
|
|
@ -1055,9 +1055,9 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
|
|||
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
|
||||
timeVal = timeVal / 1000;
|
||||
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
|
||||
timeVal = timeVal / (1000 * 1000);
|
||||
timeVal = timeVal / ((int64_t)(1000 * 1000));
|
||||
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
||||
timeVal = timeVal / (1000 * 1000 * 1000);
|
||||
timeVal = timeVal / ((int64_t)(1000 * 1000 * 1000));
|
||||
} else {
|
||||
colDataSetNULL(pOutput->columnData, i);
|
||||
continue;
|
||||
|
@ -1317,19 +1317,19 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
|
|||
case 86400000: { /* 1d */
|
||||
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
|
||||
if (ignoreTz) {
|
||||
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000)) % (86400L * 1000);
|
||||
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000)) % (((int64_t)86400) * 1000);
|
||||
} else {
|
||||
timeVal = timeVal / 1000 / 86400 * 86400 * 1000;
|
||||
}
|
||||
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
|
||||
if (ignoreTz) {
|
||||
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000)) % (86400L * 1000000);
|
||||
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000)) % (((int64_t)86400) * 1000000);
|
||||
} else {
|
||||
timeVal = timeVal / 1000000 / 86400 * 86400 * 1000000;
|
||||
}
|
||||
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
||||
if (ignoreTz) {
|
||||
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000000)) % (86400L * 1000000000);
|
||||
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000000)) % (((int64_t)86400) * 1000000000);
|
||||
} else {
|
||||
timeVal = timeVal / 1000000000 / 86400 * 86400 * 1000000000;
|
||||
}
|
||||
|
|
|
@ -1082,21 +1082,15 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
|
|||
ASSERT(pSyncNode->pFsm != NULL);
|
||||
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
|
||||
|
||||
while (1) {
|
||||
int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
|
||||
sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
|
||||
if (aqItems == 0 || aqItems == -1) {
|
||||
break;
|
||||
}
|
||||
taosMsleep(20);
|
||||
}
|
||||
|
||||
// stop elect timer
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
|
||||
// stop heartbeat timer
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
// stop ping timer
|
||||
syncNodeStopPingTimer(pSyncNode);
|
||||
|
||||
// clean rsp
|
||||
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
|
||||
}
|
||||
|
@ -1120,10 +1114,11 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
if (pSyncNode == NULL) return;
|
||||
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
|
||||
|
||||
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
|
||||
|
||||
syncNodeStopPingTimer(pSyncNode);
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
syncNodeLogReplMgrDestroy(pSyncNode);
|
||||
|
||||
syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
|
||||
|
|
|
@ -425,21 +425,6 @@ void cliHandleResp(SCliConn* conn) {
|
|||
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
|
||||
// if (TMSG_INFO(pHead->msgType - 1) != 0) {
|
||||
// char buf[128] = {0};
|
||||
// sprintf(buf, "%s", TMSG_INFO(pHead->msgType - 1));
|
||||
// int* count = taosHashGet(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)));
|
||||
// if (NULL == 0) {
|
||||
// int localCount = 1;
|
||||
// taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
|
||||
// sizeof(localCount));
|
||||
// } else {
|
||||
// int localCount = *count - 1;
|
||||
// taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
|
||||
// sizeof(localCount));
|
||||
// }
|
||||
// }
|
||||
|
||||
STraceId* trace = &transMsg.info.traceId;
|
||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
|
||||
TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
|
||||
|
@ -1118,19 +1103,6 @@ void cliSend(SCliConn* pConn) {
|
|||
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
|
||||
}
|
||||
|
||||
// if (tmsgIsValid(pHead->msgType)) {
|
||||
// char buf[128] = {0};
|
||||
// sprintf(buf, "%s", TMSG_INFO(pHead->msgType));
|
||||
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
// if (NULL == 0) {
|
||||
// int localCount = 1;
|
||||
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
// } else {
|
||||
// int localCount = *count + 1;
|
||||
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
// }
|
||||
// }
|
||||
|
||||
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
|
||||
|
||||
|
@ -1525,16 +1497,19 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
destroyCmsg(pMsg);
|
||||
return;
|
||||
}
|
||||
if (tmsgIsValid(pMsg->msg.msgType)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count + 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
if (tmsgIsValid(pMsg->msg.msgType)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count + 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1782,18 +1757,20 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
taosThreadMutexUnlock(&item->mtx);
|
||||
|
||||
void* pIter = taosHashIterate(pThrd->msgCount, NULL);
|
||||
while (pIter != NULL) {
|
||||
int* count = pIter;
|
||||
size_t len = 0;
|
||||
char* key = taosHashGetKey(pIter, &len);
|
||||
if (*count != 0) {
|
||||
tDebug("key: %s count: %d", key, *count);
|
||||
}
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
void* pIter = taosHashIterate(pThrd->msgCount, NULL);
|
||||
while (pIter != NULL) {
|
||||
int* count = pIter;
|
||||
size_t len = 0;
|
||||
char* key = taosHashGetKey(pIter, &len);
|
||||
if (*count != 0) {
|
||||
tDebug("key: %s count: %d", key, *count);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pThrd->msgCount, pIter);
|
||||
pIter = taosHashIterate(pThrd->msgCount, pIter);
|
||||
}
|
||||
tDebug("all conn count: %d", pThrd->newConnCount);
|
||||
}
|
||||
tDebug("all conn count: %d", pThrd->newConnCount);
|
||||
|
||||
int8_t supportBatch = pTransInst->supportBatch;
|
||||
if (supportBatch == 0) {
|
||||
|
@ -2379,17 +2356,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
}
|
||||
}
|
||||
|
||||
if (tmsgIsValid(pResp->msgType - 1)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 0;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count - 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
if (tmsgIsValid(pResp->msgType - 1)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 0;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count - 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pCtx->pSem != NULL) {
|
||||
|
|
|
@ -25,7 +25,15 @@
|
|||
|
||||
void taosSeedRand(uint32_t seed) { return srand(seed); }
|
||||
|
||||
uint32_t taosRand(void) { return rand(); }
|
||||
uint32_t taosRand(void) {
|
||||
#ifdef WINDOWS
|
||||
unsigned int pSeed;
|
||||
rand_s(&pSeed);
|
||||
return pSeed;
|
||||
#else
|
||||
return rand();
|
||||
#endif
|
||||
}
|
||||
|
||||
uint32_t taosRandR(uint32_t* pSeed) {
|
||||
#ifdef WINDOWS
|
||||
|
|
|
@ -184,6 +184,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_ACCTS, "Too many accounts")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_ALREADY_EXIST, "User already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_NOT_EXIST, "Invalid user")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_USER_FORMAT, "Invalid user format")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_NOT_AVAILABLE, "User not available")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_PASS_FORMAT, "Invalid password format")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_USER_FROM_CONN, "Can not get user from conn")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USERS, "Too many users")
|
||||
|
@ -211,8 +212,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize"
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST, "Tag index already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_NOT_EXIST, "Tag index not exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST, "index already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_NOT_EXIST, "index not exist")
|
||||
|
||||
|
||||
// mnode-db
|
||||
|
@ -301,9 +302,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream")
|
||||
|
||||
// mnode-sma
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "SMA does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "index already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "index not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma index option")
|
||||
|
||||
// dnode
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline")
|
||||
|
|
|
@ -733,7 +733,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
|
||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py
|
||||
|
|
|
@ -81,6 +81,7 @@ echo "cmd exit code: $RET"
|
|||
md5sum /usr/lib/libtaos.so.1
|
||||
md5sum /home/TDinternal/debug/build/lib/libtaos.so
|
||||
|
||||
|
||||
if [ $RET -ne 0 ]; then
|
||||
pwd
|
||||
fi
|
||||
|
|
|
@ -285,8 +285,6 @@ sql_error alter database db keep -1
|
|||
|
||||
print ============== modify minrows
|
||||
sql_error alter database db minrows 8
|
||||
sql_error alter database db minrows 200
|
||||
sql_error alter database db minrows 11
|
||||
sql_error alter database db minrows 8000
|
||||
sql_error alter database db minrows 8001
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
|
||||
print ===== step1
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print ===== step2
|
||||
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2 from st interval(1s) ;
|
||||
sql insert into t1 values(1648791211000,1,2,3);
|
||||
sql insert into t1 values(1648791212000,2,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 1 select * from streamt1;
|
||||
sql select * from streamt1;
|
||||
|
||||
if $rows != 2 then
|
||||
print rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
print drop stream streams1
|
||||
sql drop stream streams1;
|
||||
|
||||
print alter table streamt1 add column c3 double
|
||||
sql alter table streamt1 add column c3 double;
|
||||
|
||||
print create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
|
||||
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
|
||||
|
||||
sql insert into t2 values(1648791213000,1,2,3);
|
||||
sql insert into t1 values(1648791214000,1,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 select * from streamt1;
|
||||
sql select * from streamt1;
|
||||
|
||||
if $rows != 4 then
|
||||
print rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/stop_dnodes.sh
|
|
@ -716,8 +716,6 @@ class TDTestCase:
|
|||
tdSql.checkRows(2)
|
||||
|
||||
# partition by col
|
||||
tdSql.query(f"select c1 , mavg(c1,3) from {dbname}.stb1 partition by c1")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f"select c1 , mavg(c1,1) from {dbname}.stb1 partition by c1")
|
||||
tdSql.checkRows(40)
|
||||
tdSql.query(f"select c1, c2, c3, c4, mavg(c1,3) from {dbname}.stb1 partition by tbname ")
|
||||
|
|
|
@ -18,8 +18,8 @@ from tmqCommon import *
|
|||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 3000
|
||||
self.rowsPerTbl = 70
|
||||
self.ctbNum = 1000
|
||||
self.rowsPerTbl = 100
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
|
@ -112,15 +112,15 @@ class TDTestCase:
|
|||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 3
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||
keyList = 'group.id:cgrp3, enable.auto.commit:true, auto.commit.interval.ms:100, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
consumerId = 4
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 0")
|
||||
|
@ -131,8 +131,8 @@ class TDTestCase:
|
|||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumeTotalRows = resultList[0] + resultList[1]
|
||||
|
||||
if not (totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
tdLog.info("sum of two consume rows: %d should be greater than or equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
if not (totalRowsInserted <= actConsumeTotalRows):
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
|
@ -188,7 +188,7 @@ class TDTestCase:
|
|||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||
keyList = 'group.id:cgrp4, enable.auto.commit:true, auto.commit.interval.ms:100, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 0")
|
||||
|
@ -216,9 +216,9 @@ class TDTestCase:
|
|||
|
||||
actConsumeTotalRows = resultList[0]
|
||||
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
|
|
|
@ -18,7 +18,7 @@ from tmqCommon import *
|
|||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 10
|
||||
self.ctbNum = 4
|
||||
self.rowsPerTbl = 10000
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
|
@ -41,7 +41,7 @@ class TDTestCase:
|
|||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 10,
|
||||
'batchNum': 1000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
|
@ -112,15 +112,15 @@ class TDTestCase:
|
|||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 3
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
keyList = 'group.id:cgrp3, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
consumerId = 4
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 0")
|
||||
|
@ -130,9 +130,9 @@ class TDTestCase:
|
|||
expectRows = 2
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumeTotalRows = resultList[0] + resultList[1]
|
||||
|
||||
if not (totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
|
||||
tdLog.info("sum of two consume rows: %d should be greater than or equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
if not (totalRowsInserted <= actConsumeTotalRows):
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
|
@ -188,7 +188,7 @@ class TDTestCase:
|
|||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
keyList = 'group.id:cgrp4, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 0")
|
||||
|
@ -216,7 +216,7 @@ class TDTestCase:
|
|||
|
||||
actConsumeTotalRows = resultList[0]
|
||||
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
|
|
@ -41,7 +41,7 @@ class TDTestCase:
|
|||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 10,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
|
@ -87,7 +87,7 @@ class TDTestCase:
|
|||
'rowsPerTbl': 10000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 25,
|
||||
'pollDelay': 10,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 1}
|
||||
|
@ -116,7 +116,7 @@ class TDTestCase:
|
|||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
keyList = 'group.id:cgrp3, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
consumerId = 4
|
||||
|
@ -131,8 +131,8 @@ class TDTestCase:
|
|||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumeTotalRows = resultList[0] + resultList[1]
|
||||
|
||||
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
if not (totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
|
@ -188,7 +188,7 @@ class TDTestCase:
|
|||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
keyList = 'group.id:cgrp4, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 0")
|
||||
|
@ -213,12 +213,11 @@ class TDTestCase:
|
|||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
actConsumeTotalRows = resultList[0]
|
||||
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)):
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
|
|
|
@ -83,7 +83,7 @@ typedef struct {
|
|||
const char *clientVersion;
|
||||
char cusName[32];
|
||||
char promptHeader[32];
|
||||
const char* promptContinue;
|
||||
char promptContinue[32];
|
||||
const char* osname;
|
||||
int32_t promptSize;
|
||||
char programVersion[256];
|
||||
|
|
|
@ -411,7 +411,9 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) {
|
|||
"Copyright (c) 2022 by %s, all rights reserved.\r\n\r\n";
|
||||
strcpy(shell.info.cusName, cusName);
|
||||
sprintf(shell.info.promptHeader, "%s> ", cusPrompt);
|
||||
shell.info.promptContinue = TAOS_CONSOLE_PROMPT_CONTINUE;
|
||||
char promptContinueFormat[32] = {0};
|
||||
sprintf(promptContinueFormat, "%%%zus> ", strlen(cusPrompt));
|
||||
sprintf(shell.info.promptContinue, promptContinueFormat, " ");
|
||||
shell.info.promptSize = strlen(shell.info.promptHeader);
|
||||
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
|
||||
"version: %s compatible_version: %s\ngitinfo: %s\nbuildInfo: %s", version, compatible_version, gitinfo,
|
||||
|
|
Loading…
Reference in New Issue