This commit is contained in:
chenhaoran 2024-05-20 10:37:26 +08:00
commit ca8289b6c4
51 changed files with 4026 additions and 2424 deletions

View File

@ -329,6 +329,7 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
@ -2425,10 +2426,11 @@ int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistrib
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq* pReq);
typedef struct {
int32_t useless;
int32_t reserved;
int32_t vgId;
int32_t sqlLen;
char* sql;
char db[TSDB_DB_FNAME_LEN];
} SBalanceVgroupLeaderReq;
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);

View File

@ -582,6 +582,7 @@ typedef struct SBalanceVgroupStmt {
typedef struct SBalanceVgroupLeaderStmt {
ENodeType type;
int32_t vgId;
char dbName[TSDB_DB_NAME_LEN];
} SBalanceVgroupLeaderStmt;
typedef struct SMergeVgroupStmt {

View File

@ -33,6 +33,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#define tsem2_t tsem_t
#define tsem2_init tsem_init
#define tsem2_wait tsem_wait
#define tsem2_timewait tsem_timewait
#define tsem2_post tsem_post
#define tsem2_destroy tsem_destroy
#elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include <windows.h>
@ -44,6 +51,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#define tsem2_t tsem_t
#define tsem2_init tsem_init
#define tsem2_wait tsem_wait
#define tsem2_timewait tsem_timewait
#define tsem2_post tsem_post
#define tsem2_destroy tsem_destroy
#else
#define tsem_t sem_t
@ -53,6 +67,20 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
typedef struct tsem2_t {
TdThreadMutex mutex;
TdThreadCond cond;
TdThreadCondAttr attr;
int count;
} tsem2_t;
// #define tsem2_t sem_t
int tsem2_init(tsem2_t* sem, int pshared, unsigned int value);
int tsem2_wait(tsem2_t* sem);
int tsem2_timewait(tsem2_t* sem, int64_t milis);
int tsem2_post(tsem2_t* sem);
int tsem2_destroy(tsem2_t* sem);
#endif
#if defined(_TD_DARWIN_64)

View File

@ -222,6 +222,7 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const
int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr);
int32_t taosThreadCondAttrGetPshared(const TdThreadCondAttr *attr, int32_t *pshared);
int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr);
int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId);
int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared);
int32_t taosThreadDetach(TdThread thread);
int32_t taosThreadEqual(TdThread t1, TdThread t2);

View File

@ -105,7 +105,7 @@ struct tmq_t {
STaosQueue* mqueue; // queue of rsp
STaosQall* qall;
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
tsem_t rspSem;
tsem2_t rspSem;
};
typedef struct SAskEpInfo {
@ -727,7 +727,7 @@ static void generateTimedTask(int64_t refId, int32_t type) {
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
@ -742,7 +742,7 @@ void tmqReplayTask(void* param, void* tmrId) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) goto END;
tsem_post(&tmq->rspSem);
tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
END:
taosMemoryFree(param);
@ -1033,7 +1033,7 @@ void tmqFreeImpl(void* handle) {
}
taosFreeQall(tmq->qall);
tsem_destroy(&tmq->rspSem);
tsem2_destroy(&tmq->rspSem);
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
taos_close_internal(tmq->pTscObj);
@ -1121,7 +1121,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->consumerId = tGenIdPI64();
// init semaphore
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
SET_ERROR_MSG_TMQ("init t_sem failed")
@ -1132,7 +1132,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
tsem2_destroy(&pTmq->rspSem);
SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed;
}
@ -1434,7 +1434,7 @@ END:
taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL:
if (tmq) tsem_post(&tmq->rspSem);
if (tmq) tsem2_post(&tmq->rspSem);
taosMemoryFree(pParam);
if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
@ -1805,14 +1805,15 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
continue;
}
if (tmq->replayEnable &&
taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
continue;
@ -2127,15 +2128,15 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout) {
if (elapsedTime > timeout || elapsedTime < 0) {
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else {
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
tsem_timewait(&tmq->rspSem, 1000);
tsem2_timewait(&tmq->rspSem, 1000);
}
}
}

View File

@ -2414,10 +2414,21 @@ void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId
if (stbName == NULL){
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
}else{
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName, groupId);
int32_t i = strlen(stbName) - 1;
for(; i >= 0; i--){
if (stbName[i] == '.'){
break;
}
}
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName + i + 1, groupId);
}
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end
strcat(ctbName, tmp);
for(int i = 0; i < strlen(ctbName); i++){
if(ctbName[i] == '.'){
ctbName[i] = '_';
}
}
}
// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals.

View File

@ -5962,9 +5962,11 @@ int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgr
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->useless) < 0) return -1;
if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
ENCODESQL();
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -5977,12 +5979,15 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceV
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
}
DECODESQL();
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -20,6 +20,7 @@
#include <tglobal.h>
#include <tmsg.h>
#include <iostream>
#include <tdatablock.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
@ -475,6 +476,45 @@ TEST(testCase, AllNormTest) {
taosMemoryFree(pTSchema);
}
TEST(testCase, StreamAllNormTest) {
char ctbName[TSDB_TABLE_NAME_LEN] = {0};
uint64_t groupId = 12345;
buildCtbNameAddGroupId(NULL, ctbName, groupId);
ASSERT_STREQ("_12345", ctbName);
}
TEST(testCase, StreamWithStbName) {
char stbName[] = "1.table.stb";
char ctbName[TSDB_TABLE_NAME_LEN] = {0};
uint64_t groupId = 12345;
buildCtbNameAddGroupId(stbName, ctbName, groupId);
ASSERT_STREQ("_stb_12345", ctbName);
}
TEST(testCase, StreamWithoutDotInStbName) {
char stbName[] = "table";
char ctbName[TSDB_TABLE_NAME_LEN] = {0};
uint64_t groupId = 12345;
buildCtbNameAddGroupId(stbName, ctbName, groupId);
ASSERT_STREQ("_table_12345", ctbName);
}
TEST(testCase, StreamWithoutDotInStbName2) {
char stbName[] = "";
char ctbName[TSDB_TABLE_NAME_LEN] = {0};
uint64_t groupId = 12345;
buildCtbNameAddGroupId(stbName, ctbName, groupId);
ASSERT_STREQ("__12345", ctbName);
}
#if 1
TEST(testCase, NoneTest) {
const static int nCols = 14;

View File

@ -922,9 +922,16 @@ typedef struct {
SColVal colVal;
} SLastCol;
typedef struct {
int8_t lflag;
STsdbRowKey tsdbRowKey;
SColVal colVal;
} SLastUpdateCtx;
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow);
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData);
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);

View File

@ -385,7 +385,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
SWalReader* pWalReader = pReader->pWalReader;
uint64_t st = taosGetTimestampMs();
int64_t st = taosGetTimestampMs();
while (1) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
@ -413,7 +413,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->msg.msgStr = NULL;
if (taosGetTimestampMs() - st > 1000) {
int64_t elapsed = taosGetTimestampMs() - st;
if(elapsed > 1000 || elapsed < 0){
return false;
}

View File

@ -998,46 +998,30 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal
}
}
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
return 0;
}
int32_t code = 0;
// 1, fetch schema
STSchema *pTSchema = NULL;
int32_t sver = TSDBROW_SVERSION(pRow);
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
// 2, iterate col values into array
SArray *aColVal = taosArrayInit(32, sizeof(SColVal));
STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, pRow, pTSchema);
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
taosArrayPush(aColVal, pColVal);
}
tsdbRowClose(&iter);
// 3, build keys & multi get from rocks
int num_keys = TARRAY_SIZE(aColVal);
int num_keys = TARRAY_SIZE(updCtxArray);
SArray *remainCols = NULL;
SLRUCache *pCache = pTsdb->lruCache;
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(pRow, &tsdbRowKey);
SRowKey *pRowKey = &tsdbRowKey.key;
taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < num_keys; ++i) {
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
int16_t cid = pColVal->cid;
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i);
SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
int8_t lflag = updCtx->lflag;
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
SColVal *pColVal = &updCtx->colVal;
if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
continue;
}
SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) {
@ -1053,23 +1037,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
}
taosArrayPush(remainCols, &(SIdxKey){i, *key});
}
if (COL_VAL_IS_VALUE(pColVal)) {
key->lflag = LFLAG_LAST;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
}
taosLRUCacheRelease(pCache, h, false);
} else {
if (!remainCols) {
remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
}
taosArrayPush(remainCols, &(SIdxKey){i, *key});
}
}
}
if (remainCols) {
@ -1097,13 +1064,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i);
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
SColVal *pColVal = &updCtx->colVal;
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
SLastCol *PToFree = pLastCol;
if (IS_LAST_ROW_KEY(idxKey->key)) {
if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
continue;
}
int32_t cmp_res = 1;
if (pLastCol) {
cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
@ -1148,49 +1119,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(value);
}
} else {
if (COL_VAL_IS_VALUE(pColVal)) {
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
char *value = NULL;
size_t vlen = 0;
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal};
tsdbCacheSerialize(&lastColTmp, &value, &vlen);
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
pLastCol = &lastColTmp;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge,
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosMemoryFree(value);
}
}
}
taosMemoryFreeClear(PToFree);
rocksdb_free(values_list[i]);
@ -1209,11 +1137,152 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit:
taosArrayDestroy(aColVal);
taosMemoryFree(pTSchema);
return code;
}
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
SRow **aRow) {
int32_t code = 0;
// 1. prepare last
TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
STSchema *pTSchema = NULL;
int32_t sver = TSDBROW_SVERSION(&lRow);
SArray *ctxArray = NULL;
SSHashObj *iColHash = NULL;
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _exit;
}
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
int32_t nCol = pTSchema->numOfCols;
ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx));
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
// 1. prepare by lrow
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&lRow, &tsdbRowKey);
STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, &lRow, pTSchema);
int32_t iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
taosArrayPush(ctxArray, &updateCtx);
if (!COL_VAL_IS_VALUE(pColVal)) {
tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0);
continue;
}
updateCtx.lflag = LFLAG_LAST;
taosArrayPush(ctxArray, &updateCtx);
}
tsdbRowClose(&iter);
// 2. prepare by the other rows
for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
if (tSimpleHashGetSize(iColHash) == 0) {
break;
}
tRow.pTSRow = aRow[iRow];
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&tRow, &tsdbRowKey);
void *pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
int32_t iCol = ((int32_t *)pIte)[0];
SColVal colVal = COL_VAL_NONE(0, 0);
tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
if (COL_VAL_IS_VALUE(&colVal)) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
taosArrayPush(ctxArray, &updateCtx);
tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
}
}
}
// 3. do update
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
_exit:
taosMemoryFreeClear(pTSchema);
taosArrayDestroy(ctxArray);
tSimpleHashCleanup(iColHash);
return code;
}
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
int32_t code = 0;
TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
STSchema *pTSchema = NULL;
int32_t sver = TSDBROW_SVERSION(&lRow);
SArray *ctxArray = NULL;
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _exit;
}
ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
// 1. prepare last
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
SColData *pColData = &pBlockData->aColData[iColData];
if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
continue;
}
for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&tRow, &tsdbRowKey);
uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
if (colType == 2) {
SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
tColDataGetValue(pColData, tRow.iRow, &colVal);
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
taosArrayPush(ctxArray, &updateCtx);
break;
}
}
}
// 2. prepare last row
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&lRow, &tsdbRowKey);
STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, &lRow, pTSchema);
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
taosArrayPush(ctxArray, &updateCtx);
}
tsdbRowClose(&iter);
// 3. do update
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
_exit:
taosMemoryFreeClear(pTSchema);
taosArrayDestroy(ctxArray);
return 0;
}
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds);
@ -1487,7 +1556,10 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
taosArraySet(pLastArray, idxKey->idx, pLastCol);
// taosArrayRemove(remainCols, i);
if (!pTmpColArray) {
if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
continue;
}
if (/*!pTmpColArray*/ lastrowTmpIndexArray && lastrowTmpColArray) {
continue;
}

View File

@ -629,14 +629,12 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
STsdbRowKey key;
TSDBROW lRow; // last row
// first row
tsdbRowGetKey(&tRow, &key);
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
lRow = tRow;
// remain row
++tRow.iRow;
@ -653,7 +651,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
}
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
lRow = tRow;
++tRow.iRow;
}
@ -664,7 +661,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
}
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData);
}
// SMemTable
@ -688,7 +685,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
int32_t iRow = 0;
TSDBROW lRow;
// backward put first data
tRow.pTSRow = aRow[iRow++];
@ -696,7 +692,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
if (code) goto _exit;
lRow = tRow;
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
@ -717,8 +712,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
if (code) goto _exit;
lRow = tRow;
iRow++;
}
}
@ -727,7 +720,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
pTbData->maxKey = key.key.ts;
}
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow);
}
// SMemTable

View File

@ -47,10 +47,8 @@ extern "C" {
} \
} else { \
(_pKey)->ts = (_pRow)->pBlockData->aTSKEY[(_pRow)->iRow]; \
if ((_pRow)->pBlockData->nColData > 0) { \
tColRowGetPrimaryKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \
} \
} \
}
typedef enum {

View File

@ -601,11 +601,14 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
STColumn *pTColumn = &pTSchema->columns[iCol];
SValue value;
ASSERT(iCol > 0);
if (pRow->type == TSDBROW_ROW_FMT) {
tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
} else if (pRow->type == TSDBROW_COL_FMT) {
if (iCol == 0) {
*pColVal =
COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID,
((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]}));
} else {
SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
if (pColData) {
@ -613,6 +616,7 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
} else {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
}
}
} else {
ASSERT(0);
}

View File

@ -446,7 +446,13 @@ typedef struct STimeWindowAggSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SSteamOpBasicInfo {
int32_t primaryPkIndex;
bool updateOperatorInfo;
} SSteamOpBasicInfo;
typedef struct SStreamScanInfo {
SSteamOpBasicInfo basic;
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SExprSupp tbnameCalSup;
@ -568,10 +574,6 @@ typedef struct SOpCheckPointInfo {
SHashObj* children; // key:child id
} SOpCheckPointInfo;
typedef struct SSteamOpBasicInfo {
int32_t primaryPkIndex;
} SSteamOpBasicInfo;
typedef struct SStreamIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info
SSteamOpBasicInfo basic;

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef STREAM_EXECUTORINT_H
#define STREAM_EXECUTORINT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "executorInt.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
#ifdef __cplusplus
}
#endif
#endif // STREAM_EXECUTORINT_H

View File

@ -20,6 +20,7 @@
#include "os.h"
#include "querynodes.h"
#include "systable.h"
#include "streamexecutorInt.h"
#include "tname.h"
#include "tdatablock.h"
@ -2426,10 +2427,13 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
if (!pInfo->pState) {
return;
}
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
void* pBuf = NULL;
int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
taosMemoryFree(pBuf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
}
// other properties are recovered from the execution plan
@ -2582,6 +2586,7 @@ FETCH_NEXT_BLOCK:
case STREAM_NORMAL:
case STREAM_GET_ALL:
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
return pBlock;
case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
@ -2622,6 +2627,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->pDeleteDataRes->info.rows > 0) {
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
return pInfo->pDeleteDataRes;
} else {
goto FETCH_NEXT_BLOCK;
@ -2639,6 +2645,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->pDeleteDataRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
return pInfo->pDeleteDataRes;
} else {
goto FETCH_NEXT_BLOCK;
@ -2652,6 +2659,7 @@ FETCH_NEXT_BLOCK:
break;
}
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
@ -2659,6 +2667,7 @@ FETCH_NEXT_BLOCK:
case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
@ -2762,6 +2771,7 @@ FETCH_NEXT_BLOCK:
}
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

View File

@ -17,6 +17,7 @@
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "streamexecutorInt.h"
#include "tchecksum.h"
#include "tcommon.h"
#include "tdatablock.h"
@ -415,6 +416,7 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
@ -422,6 +424,8 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
}
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
@ -550,6 +554,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
break;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);

View File

@ -18,6 +18,7 @@
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "streamexecutorInt.h"
#include "tchecksum.h"
#include "tcommon.h"
#include "tcompare.h"
@ -458,6 +459,7 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
@ -465,6 +467,8 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
}
static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
@ -531,6 +535,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
break;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
pBasicInfo->updateOperatorInfo = true;
}
}
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo) {
return pBasicInfo->updateOperatorInfo;
}
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}

View File

@ -18,6 +18,7 @@
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "streamexecutorInt.h"
#include "tchecksum.h"
#include "tcommon.h"
#include "tcompare.h"
@ -1211,6 +1212,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
@ -1218,6 +1220,8 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
}
static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
@ -1347,6 +1351,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
@ -2690,6 +2695,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
@ -2697,6 +2703,8 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
}
void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
@ -2766,6 +2774,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
break;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
@ -3176,6 +3185,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
break;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
@ -3673,6 +3683,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
@ -3680,6 +3691,8 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
}
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
@ -3746,6 +3759,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
break;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
@ -4069,6 +4083,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
@ -4465,6 +4480,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
}
pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;

View File

@ -1651,6 +1651,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
SSysTableScanInfo* pInfo = pOperator->info;
char dbName[TSDB_DB_NAME_LEN] = {0};
while (1) {
if (isTaskKilled(pOperator->pTaskInfo)) {
setOperatorCompleted(pOperator);
return NULL;
@ -1692,11 +1694,15 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
}
return pBlock->info.rows > 0 ? pBlock : NULL;
if (pBlock->info.rows == 0) {
continue;
}
return pBlock;
} else {
return NULL;
}
}
}
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock) {

View File

@ -187,6 +187,8 @@ const char* nodesNodeName(ENodeType type) {
return "BalanceVgroupStmt";
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return "BalanceVgroupLeaderStmt";
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
return "BalanceVgroupLeaderStmt";
case QUERY_NODE_MERGE_VGROUP_STMT:
return "MergeVgroupStmt";
case QUERY_NODE_SHOW_DB_ALIVE_STMT:
@ -7607,6 +7609,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize.
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize.
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
return TSDB_CODE_SUCCESS;
case QUERY_NODE_MERGE_VGROUP_STMT:
return mergeVgroupStmtToJson(pObj, pJson);
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
@ -7953,6 +7957,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_BALANCE_VGROUP_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize.
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return TSDB_CODE_SUCCESS;
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to deserialize.
case QUERY_NODE_MERGE_VGROUP_STMT:
return jsonToMergeVgroupStmt(pJson, pObj);

View File

@ -473,6 +473,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SBalanceVgroupStmt));
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return makeNode(type, sizeof(SBalanceVgroupLeaderStmt));
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
return makeNode(type, sizeof(SBalanceVgroupLeaderStmt));
case QUERY_NODE_MERGE_VGROUP_STMT:
return makeNode(type, sizeof(SMergeVgroupStmt));
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
@ -1161,6 +1163,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field
case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field
break;
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:

View File

@ -274,6 +274,7 @@ SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId);
SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName);
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes);
SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId);

View File

@ -773,6 +773,7 @@ cmd ::= KILL COMPACT NK_INTEGER(A).
/************************************************ merge/redistribute/ vgroup ******************************************/
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); }
cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); }
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }

View File

@ -2809,6 +2809,16 @@ SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgI
return (SNode*)pStmt;
}
SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName){
CHECK_PARSER_STATUS(pCxt);
SBalanceVgroupLeaderStmt* pStmt = (SBalanceVgroupLeaderStmt*)nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT);
CHECK_OUT_OF_MEM(pStmt);
if (NULL != pDbName) {
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
}
return (SNode*)pStmt;
}
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) {
CHECK_PARSER_STATUS(pCxt);
SMergeVgroupStmt* pStmt = (SMergeVgroupStmt*)nodesMakeNode(QUERY_NODE_MERGE_VGROUP_STMT);

View File

@ -10578,6 +10578,7 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm
static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) {
SBalanceVgroupLeaderReq req = {0};
req.vgId = pStmt->vgId;
if(pStmt->dbName != NULL) strcpy(req.db, pStmt->dbName);
int32_t code =
buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req);
tFreeSBalanceVgroupLeaderReq(&req);
@ -11263,6 +11264,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode);
break;
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode);
break;
case QUERY_NODE_MERGE_VGROUP_STMT:
code = translateMergeVgroup(pCxt, (SMergeVgroupStmt*)pNode);
break;

File diff suppressed because it is too large Load Diff

View File

@ -246,11 +246,10 @@ static bool scanPathOptMayBeOptimized(SLogicNode* pNode) {
static bool scanPathOptShouldGetFuncs(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
if (pNode->pParent && QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
if (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType) return true;
} else {
if (!pNode->pParent || QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) ||
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType)
return !scanPathOptHaveNormalCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
}
return false;
}
if ((QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&

View File

@ -247,6 +247,7 @@ int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);

View File

@ -146,6 +146,9 @@ static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter,
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t copyFiles(const char* src, const char* dst);
uint32_t nextPow2(uint32_t x);
@ -1010,6 +1013,50 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
}
#endif
int chkpIdComp(const void* a, const void* b) {
int64_t x = *(int64_t*)a;
int64_t y = *(int64_t*)b;
if (x == y) return 0;
return x < y ? -1 : 1;
}
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
int32_t code = 0;
char* pChkpDir = taosMemoryCalloc(1, 256);
sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
if (!taosIsDir(pChkpDir)) {
taosMemoryFree(pChkpDir);
return 0;
}
TdDirPtr pDir = taosOpenDir(pChkpDir);
if (pDir == NULL) {
taosMemoryFree(pChkpDir);
return 0;
}
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
if (taosDirEntryIsDir(de)) {
char checkpointPrefix[32] = {0};
int64_t checkpointId = 0;
int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
if (ret == 1) {
taosArrayPush(pBackend->chkpSaved, &checkpointId);
}
} else {
continue;
}
}
taosArraySort(pBackend->chkpSaved, chkpIdComp);
taosMemoryFree(pChkpDir);
taosCloseDir(&pDir);
return 0;
}
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
@ -1043,8 +1090,7 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
taosMemoryFreeClear(err);
goto _ERROR;
}
rocksdb_checkpoint_create(cp, path, 64 << 20, &err);
rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err);
if (err != NULL) {
stError("failed to do checkpoint at:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
@ -1056,7 +1102,6 @@ _ERROR:
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
if (nCf == 0) return 0;
int code = 0;
char* err = NULL;
@ -1109,9 +1154,12 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
taskDbAddRef(pTaskDb);
code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId);
int64_t chkpId = pTaskDb->chkpId;
code = taskDbDoCheckpoint(pTaskDb, chkpId);
taskDbRemoveRef(pTaskDb);
taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
@ -1124,6 +1172,28 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
return code;
}
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
if (pSnapInfo == NULL) return 0;
SStreamMeta* pMeta = arg;
int32_t code = 0;
taosThreadMutexLock(&pMeta->backendMutex);
char buf[128] = {0};
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
if (pTaskDb == NULL) {
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
continue;
}
memset(buf, 0, sizeof(buf));
taskDbUnRefChkp(pTaskDb, pSnap->chkpId);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
return 0;
}
#ifdef BUILD_NO_CALL
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
// if (arg == NULL) return 0;
@ -1192,20 +1262,23 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) {
int64_t written = atomic_load_64(&pTaskDb->dataWritten);
if (written > 0) {
stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
} else {
stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
}
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
} else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
taosGetTimestampMs() - st);
}
} else {
stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir);
}
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
pTaskDb->dataWritten = 0;
atomic_store_64(&pTaskDb->dataWritten, 0);
pTaskDb->chkpId = chkpId;
_EXIT:
@ -1846,7 +1919,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
// rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
rocksdb_options_set_recycle_log_file_num(opts, 6);
// rocksdb_options_set_ecycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3);
rocksdb_options_set_info_log_level(opts, 1);
rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
@ -1900,11 +1973,32 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
pTaskDb->chkpId = -1;
pTaskDb->chkpCap = 4;
pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
taskDbLoadChkpInfo(pTaskDb);
pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);
}
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
taosArrayPush(pTaskDb->chkpInUse, &chkp);
taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
}
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) {
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
if (*p == chkp) {
taosArrayRemove(pTaskDb->chkpInUse, i);
break;
}
}
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
}
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
taosArrayDestroy(pTaskDb->chkpSaved);
taosArrayDestroy(pTaskDb->chkpInUse);
@ -2131,15 +2225,18 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return code;
}
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
int32_t code = -1;
STaskDbWrapper* pDb = arg;
ECHECKPOINT_BACKUP_TYPE utype = type;
taskDbRefChkp(pDb, chkpId);
if (utype == DATA_UPLOAD_RSYNC) {
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == DATA_UPLOAD_S3) {
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
}
return -1;
taskDbUnRefChkp(pDb, chkpId);
return code;
}
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
@ -2563,7 +2660,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
wrapper->dataWritten += 1; \
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
char toString[128] = {0}; \
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
@ -2640,7 +2737,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
wrapper->dataWritten += 1; \
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
char toString[128] = {0}; \
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
@ -2681,7 +2778,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
stDebug("streamStateClear_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
atomic_add_fetch_64(&wrapper->dataWritten, 1);
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
@ -2740,9 +2837,11 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if (pKey->groupId == groupId) {
return 0;
}
if (pVal != NULL) {
taosMemoryFree((void*)*pVal);
*pVal = NULL;
}
}
return -1;
}
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
@ -3705,7 +3804,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) {
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
atomic_add_fetch_64(&wrapper->dataWritten, 1);
int i = streamStateGetCfIdx(pState, cfKeyName);
if (i < 0) {
@ -3739,7 +3838,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
atomic_add_fetch_64(&wrapper->dataWritten, 1);
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
@ -3758,7 +3858,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
atomic_add_fetch_64(&wrapper->dataWritten, 1);
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
stError("streamState failed to write batch, err:%s", err);

View File

@ -74,7 +74,9 @@ struct SStreamSnapHandle {
int32_t currFileIdx;
char* metaPath;
void* pMeta;
SArray* pDbSnapSet;
SArray* pSnapInfoSet;
int32_t currIdx;
int8_t delFlag; // 0 : not del, 1: del
};
@ -140,7 +142,9 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt);
}
int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
@ -291,30 +295,26 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later
SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet);
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (code != 0) {
return -1;
}
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i);
SBackendSnapFile2 snapFile = {0};
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile);
}
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
taosArrayDestroy(pSnapSet);
pHandle->pDbSnapSet = pDbSnapSet;
pHandle->pSnapInfoSet = pSnapInfoSet;
pHandle->currIdx = 0;
pHandle->pMeta = pMeta;
return 0;
_err:
@ -333,6 +333,14 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
}
taosArrayDestroy(handle->pDbSnapSet);
}
streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
if (handle->pSnapInfoSet) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
taosArrayDestroy(handle->pSnapInfoSet);
}
taosMemoryFree(handle->metaPath);
return;
}

View File

@ -557,7 +557,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
const int32_t BATCH_LIMIT = 256;
int64_t st = taosGetTimestampMs();
int32_t numOfElems = listNEles(pSnapshot);
SListNode* pNode = NULL;
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
@ -589,8 +588,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
taosMemoryFree(buf);
if (streamStateGetBatchSize(batch) > 0) {
int32_t numOfElems = streamStateGetBatchSize(batch);
if (numOfElems > 0) {
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} else {
goto _end;
}
streamStateClearBatch(batch);
@ -609,6 +611,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
}
_end:
streamStateDestroyBatch(batch);
return code;
}

View File

@ -123,7 +123,7 @@ typedef struct {
STransMsg* pRsp;
SEpSet epSet;
int8_t hasEpSet;
tsem_t* pSem;
tsem2_t* pSem;
int8_t inited;
SRWLatch latch;
} STransSyncMsg;
@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
int transResetBuffer(SConnBuffer* connBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
int transSetConnOption(uv_tcp_t* stream, int keepalive);

View File

@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) {
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
taosMemoryFree(pHead);
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
return;
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn);
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}
@ -2450,7 +2455,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pSyncMsg->hasEpSet = 1;
epsetAssign(&pSyncMsg->epSet, &pCtx->epSet);
}
tsem_post(pSyncMsg->pSem);
tsem2_post(pSyncMsg->pSem);
taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
} else {
rpcFreeCont(pResp->pCont);
@ -2679,8 +2684,8 @@ _RETURN:
return ret;
}
int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t));
tsem2_init(sem, 0, 0);
STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg));
@ -2740,7 +2745,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
goto _RETURN;
}
ret = tsem_timewait(pSyncMsg->pSem, timeoutMs);
ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs);
if (ret < 0) {
pRsp->code = TSDB_CODE_TIMEOUT_ERROR;
ret = TSDB_CODE_TIMEOUT_ERROR;

View File

@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) {
return 0;
}
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
static const int HEADSIZE = sizeof(STransMsgHead);
SConnBuffer* p = connBuf;
@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
if (transResetBuffer(connBuf) < 0) {
if (transResetBuffer(connBuf, resetBuf) < 0) {
return -1;
}
} else {
@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
return total;
}
int transResetBuffer(SConnBuffer* connBuf) {
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
SConnBuffer* p = connBuf;
if (p->total < p->len) {
int left = p->len - p->total;
@ -159,9 +159,11 @@ int transResetBuffer(SConnBuffer* connBuf) {
p->total = 0;
p->len = 0;
if (p->cap > BUFFER_CAP) {
if (resetBuf) {
p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, p->cap);
}
}
} else {
ASSERTS(0, "invalid read from sock buf");
return -1;
@ -681,7 +683,7 @@ void transDestroySyncMsg(void* msg) {
if (msg == NULL) return;
STransSyncMsg* pSyncMsg = msg;
tsem_destroy(pSyncMsg->pSem);
tsem2_destroy(pSyncMsg->pSem);
taosMemoryFree(pSyncMsg->pSem);
transFreeMsg(pSyncMsg->pRsp->pCont);
taosMemoryFree(pSyncMsg->pRsp);

View File

@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) {
STransMsgHead* pHead = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false;
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn);
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
@ -676,6 +680,7 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
taosMemoryFree(smsg);
}
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
static void destroyAllConn(SWorkThrd* pThrd) {
tTrace("thread %p destroy all conn ", pThrd);
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {

View File

@ -215,14 +215,6 @@ int32_t taosGetAppName(char* name, int32_t* len) {
return 0;
}
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
int ret = 0;
@ -241,4 +233,101 @@ int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
return ret;
}
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) {
int ret = taosThreadMutexInit(&sem->mutex, NULL);
if (ret != 0) return ret;
ret = taosThreadCondAttrInit(&sem->attr);
if (ret != 0)
{
taosThreadMutexDestroy(&sem->mutex);
return ret;
}
ret = taosThreadCondAttrSetclock(&sem->attr, CLOCK_MONOTONIC);
if (ret != 0)
{
taosThreadMutexDestroy(&sem->mutex);
taosThreadCondAttrDestroy(&sem->attr);
return ret;
}
ret = taosThreadCondInit(&sem->cond, &sem->attr);
if (ret != 0)
{
taosThreadMutexDestroy(&sem->mutex);
taosThreadCondAttrDestroy(&sem->attr);
return ret;
}
sem->count = value;
return 0;
}
int tsem2_post(tsem2_t *sem) {
taosThreadMutexLock(&sem->mutex);
sem->count++;
taosThreadCondSignal(&sem->cond);
taosThreadMutexUnlock(&sem->mutex);
return 0;
}
int tsem2_destroy(tsem2_t* sem) {
taosThreadMutexDestroy(&sem->mutex);
taosThreadCondDestroy(&sem->cond);
taosThreadCondAttrDestroy(&sem->attr);
return 0;
}
int32_t tsem2_wait(tsem2_t* sem) {
taosThreadMutexLock(&sem->mutex);
while (sem->count <= 0) {
int ret = taosThreadCondWait(&sem->cond, &sem->mutex);
if (0 == ret) {
continue;
} else {
taosThreadMutexUnlock(&sem->mutex);
return ret;
}
}
sem->count--;
taosThreadMutexUnlock(&sem->mutex);
return 0;
}
int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) {
int ret = 0;
taosThreadMutexLock(&sem->mutex);
if (sem->count <= 0) {
struct timespec ts = {0};
if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
taosThreadMutexUnlock(&sem->mutex);
return -1;
}
ts.tv_sec += ms / 1000;
ts.tv_nsec += (ms % 1000) * 1000000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
while (sem->count <= 0) {
ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts);
if (ret != 0) {
taosThreadMutexUnlock(&sem->mutex);
return ret;
}
}
}
sem->count--;
taosThreadMutexUnlock(&sem->mutex);
return ret;
}
#endif

View File

@ -170,6 +170,16 @@ int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) {
#endif
}
int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId) {
#ifdef __USE_WIN_THREAD
return 0;
#elif defined(__APPLE__)
return 0;
#else
return pthread_condattr_setclock(attr, clockId);
#endif
}
int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared) {
#ifdef __USE_WIN_THREAD
return 0;

View File

@ -71,7 +71,6 @@ TEST(osSemaphoreTests, WaitTime1) {
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, WaitAndPost) {
tsem_t sem;
int result = tsem_init(&sem, 0, 0);
@ -106,3 +105,135 @@ TEST(osSemaphoreTests, TimedWait) {
result = tsem_destroy(&sem);
EXPECT_EQ(result, 0);
}
TEST(osSemaphoreTests, Performance1_1) {
tsem_t sem;
const int count = 100000;
tsem_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
for (int i = 0; i < count; ++i) {
tsem_wait(&sem);
}
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, Performance1_2) {
tsem2_t sem;
const int count = 100000;
tsem2_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
for (int i = 0; i < count; ++i) {
tsem2_wait(&sem);
}
tsem2_destroy(&sem);
}
TEST(osSemaphoreTests, Performance2_1) {
tsem_t sem;
const int count = 50000;
tsem_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
for (int i = 0; i < count * 2; ++i) {
tsem_wait(&sem);
}
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, Performance2_2) {
tsem2_t sem;
const int count = 50000;
tsem2_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
for (int i = 0; i < count * 2; ++i) {
tsem2_wait(&sem);
}
tsem2_destroy(&sem);
}
TEST(osSemaphoreTests, Performance3_1) {
const int count = 100000;
for (int i = 0; i < count; ++i) {
tsem_t sem;
tsem_init(&sem, 0, 1);
EXPECT_EQ(tsem_timewait(&sem, 1000), 0);
tsem_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance3_2) {
const int count = 100000;
for (int i = 0; i < count; ++i) {
tsem2_t sem;
tsem2_init(&sem, 0, 1);
EXPECT_EQ(tsem2_timewait(&sem, 1000), 0);
tsem2_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance4_1) {
const int count = 1000;
for (int i = 0; i < count; ++i) {
tsem_t sem;
tsem_init(&sem, 0, 0);
std::thread([&sem, count]() {
tsem_post(&sem);
}).detach();
tsem_timewait(&sem, 1000);
tsem_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance4_2) {
const int count = 1000;
for (int i = 0; i < count; ++i) {
tsem2_t sem;
tsem2_init(&sem, 0, 0);
std::thread([&sem, count]() {
tsem2_post(&sem);
}).detach();
tsem2_timewait(&sem, 1000);
tsem2_destroy(&sem);
}
}

View File

@ -0,0 +1,86 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import random
import taos
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def checkGeometry(self):
tdLog.info(f"check geometry")
tdSql.execute("create database db_geometry;")
tdSql.execute("use db_geometry;")
tdSql.execute("create table t_ge (ts timestamp, id int, c1 GEOMETRY(512));")
tdSql.execute("insert into t_ge values(now, 1, 'MULTIPOINT ((0 0), (1 1))');")
tdSql.execute("insert into t_ge values(now, 1, 'MULTIPOINT (0 0, 1 1)');")
tdSql.execute("insert into t_ge values(now, 2, 'POINT (0 0)');")
tdSql.execute("insert into t_ge values(now, 2, 'POINT EMPTY');")
tdSql.execute("insert into t_ge values(now, 3, 'LINESTRING (0 0, 0 1, 1 2)');")
tdSql.execute("insert into t_ge values(now, 3, 'LINESTRING EMPTY');")
tdSql.execute("insert into t_ge values(now, 4, 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))');")
tdSql.execute("insert into t_ge values(now, 4, 'POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0), (1 1, 1 2, 2 2, 2 1, 1 1))');")
tdSql.execute("insert into t_ge values(now, 4, 'POLYGON EMPTY');")
tdSql.execute("insert into t_ge values(now, 5, 'MULTILINESTRING ((0 0, 1 1), (2 2, 3 3))');")
tdSql.execute("insert into t_ge values(now, 6, 'MULTIPOLYGON (((1 1, 1 3, 3 3, 3 1, 1 1)), ((4 3, 6 3, 6 1, 4 1, 4 3)))');")
tdSql.execute("insert into t_ge values(now, 7, 'GEOMETRYCOLLECTION (MULTIPOINT((0 0), (1 1)), POINT(3 4), LINESTRING(2 3, 3 4))');")
tdSql.query("select * from t_ge;")
tdSql.checkRows(12)
tdSql.query("select * from t_ge where id=1;")
tdSql.checkRows(2)
tdSql.query("select * from t_ge where id=2;")
tdSql.checkRows(2)
tdSql.query("select * from t_ge where id=3;")
tdSql.checkRows(2)
tdSql.query("select * from t_ge where id=4;")
tdSql.checkRows(3)
tdSql.query("select * from t_ge where id=5;")
tdSql.checkRows(1)
tdSql.query("select * from t_ge where id=6;")
tdSql.checkRows(1)
tdSql.query("select * from t_ge where id=7;")
tdSql.checkRows(1)
def checkDataType(self):
tdLog.info(f"check datatype")
self.checkGeometry()
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")
# check insert datatype
self.checkDataType()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -11,7 +11,7 @@
# army-test
#
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
,,n,army,python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/test_join.py
@ -20,6 +20,7 @@
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/insert/insert_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
,,n,army,python3 ./test.py -f community/query/show.py -N 3

View File

@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so
#define taospy 2.7.10
pip3 list|grep taospy
pip3 uninstall taospy -y
pip3 install --default-timeout=120 taospy==2.7.13
pip3 install --default-timeout=120 taospy==2.7.15
#define taos-ws-py 0.3.1
pip3 list|grep taos-ws-py

View File

@ -135,4 +135,177 @@ endi
print =========================== td-24781
sql select DISTINCT (`precision`) from `information_schema`.`ins_databases` PARTITION BY `precision`
print =========================ins_stables
print create database test vgroups 4;
sql create database test vgroups 4;
sql use test;
sql create stable st1(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st2(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st3(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st4(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st5(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st6(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st7(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st8(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st9(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st10(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st11(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st12(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st13(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st14(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st15(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st16(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st17(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st18(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st19(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st20(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st21(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st22(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st23(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st24(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st25(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st26(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st27(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st28(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st29(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st30(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st31(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st32(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st33(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st34(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st35(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st36(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st37(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st38(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st39(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st40(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st41(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st42(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st43(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st44(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st45(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st46(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st47(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st48(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st49(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st50(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st51(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st52(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st53(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st54(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st55(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st56(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st57(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st58(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st59(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st60(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st61(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st62(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st63(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st64(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st65(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st66(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st67(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st68(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st69(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st70(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
print create database test1 vgroups 4;
sql create database test1 vgroups 4;
sql use test1;
sql create stable st1(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st2(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st3(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st4(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st5(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st6(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st7(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st8(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st9(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st10(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st11(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st12(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st13(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st14(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st15(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st16(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st17(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st18(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st19(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st20(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st21(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st22(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st23(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st24(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st25(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st26(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st27(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st28(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st29(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st30(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st31(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st32(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st33(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st34(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st35(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st36(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st37(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st38(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st39(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st40(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st41(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st42(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st43(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st44(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st45(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st46(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st47(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st48(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st49(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st50(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st51(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st52(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st53(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st54(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st55(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st56(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st57(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st58(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st59(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st60(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st61(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st62(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st63(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st64(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st65(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st66(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st67(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st68(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st69(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stable st70(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sleep 1000
sql select * from information_schema.ins_stables where db_name = "test" limit 68,32;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
if $rows != 2 then
return -1
endi
sql select * from information_schema.ins_stables where db_name = "test1" limit 68,32;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
if $rows != 2 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,248 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"connection_pool_size": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"prepared_rand": 10,
"chinese": "no",
"insert_interval": 0,
"num_of_records_per_req": 10,
"databases": [{
"dbinfo": {
"name": "db_all_insert_mode",
"drop": "yes"
},
"super_tables": [{
"name": "sml_json",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb1_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml",
"line_protocol": "json",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "BOOL"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "sml_line",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb2_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml",
"line_protocol": "line",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "TINYINT"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "sml_telnet",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb3_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml",
"line_protocol": "telnet",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "SMALLINT"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "rest",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb4_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "rest",
"line_protocol": "json",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "INT"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "stmt",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb5_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "stmt",
"line_protocol": "json",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "BIGINT"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "sml_rest_json",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb6_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml-rest",
"line_protocol": "json",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "FLOAT"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "sml_rest_line",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb7_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml-rest",
"line_protocol": "line",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "DOUBLE"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "sml_rest_telnet",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb8_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml-rest",
"line_protocol": "telnet",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "BINARY", "len": 8}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "taosc",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb9_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "taosc",
"line_protocol": "json",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 2,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "NCHAR", "len": 8}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "BINARY", "len": 19, "count":1}]
}]
}]
}

View File

@ -43,6 +43,7 @@ class TDTestCase:
delete from deldata.ct1;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a );
flush database deldata;'''
def checkProcessPid(self,processName):
i=0
while i<60:
@ -110,7 +111,8 @@ class TDTestCase:
else:
print(f"{packageName} has been exists")
os.system(f" cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " )
tdDnodes.stop(1)
os.system(f"pkill -9 taosd" )
print(f"start taosd: rm -rf {dataPath}/* && nohup /usr/bin/taosd -c {cPath} & ")
os.system(f"rm -rf {dataPath}/* && nohup /usr/bin/taosd -c {cPath} & " )
os.system(f"killall taosadapter" )
@ -165,32 +167,31 @@ class TDTestCase:
cPath = self.getCfgPath()
dbname = "test"
stb = f"{dbname}.meters"
# package_type = "enterprise"
package_type = "community"
self.installTaosd(bPath,cPath,package_type)
# os.system(f"echo 'debugFlag 143' >> {cPath}/taos.cfg ")
tableNumbers=100
recordNumbers1=100
recordNumbers1=1000
recordNumbers2=1000
# tdsqlF=tdCom.newTdSql()
# print(tdsqlF)
# tdsqlF.query(f"SELECT SERVER_VERSION();")
# print(tdsqlF.query(f"SELECT SERVER_VERSION();"))
# oldServerVersion=tdsqlF.queryResult[0][0]
# tdLog.info(f"Base server version is {oldServerVersion}")
# tdsqlF.query(f"SELECT CLIENT_VERSION();")
# # the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
# oldClientVersion=tdsqlF.queryResult[0][0]
# tdLog.info(f"Base client version is {oldClientVersion}")
# baseVersion = "3.0.1.8"
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -O 5 -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -O 5 -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'alter database test keep 365000 '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'alter database test cachemodel \"both\" '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select last(*) from test.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s \"insert into test.d1 values (now+1s, 11, 190, 0.21), (now+2s, 11, 190, 0.21), (now+3s, 11, 190, 0.21), ('2015-07-14 08:39:59.001', 11, 190, 0.21), ('2032-08-14 08:39:59.001 ', 11, 190, 0.21) test.d3 values (now+6s, 11, 190, 0.21), (now+7s, 11, 190, 0.21), (now+8s, 11, 190, 0.21), ('2033-07-14 08:39:59.000', 119, 191, 0.25) test.d3 (ts) values ('2033-07-14 08:39:58.000');\"")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select last(*) from test.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s \"insert into test.d1 values (now+11s, 11, 190, 0.21), (now+12s, 11, 190, 0.21), (now+13s, 11, 190, 0.21), (now+14s, 11, 190, 0.21), (now+15s, 11, 190, 0.21) test.d3 values (now+16s, 11, 190, 0.21), (now+17s, 11, 190, 0.21), (now+18s, 11, 190, 0.21), (now+19s, 119, 191, 0.25) test.d3 (ts) values (now+20s);\"")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/com_alltypedata.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database curdb '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'alter database curdb cachemodel \"both\" '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select count(*) from curdb.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select last(*) from curdb.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select sum(fc) from curdb.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select avg(ic) from curdb.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select min(ui) from curdb.meters '")
@ -211,7 +212,8 @@ class TDTestCase:
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {stable_topic} as stable test.meters where tbname like \\"d3\\";" ')
select_topic = "select_test_meters_topic"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {select_topic} as select current,voltage,phase from test.meters where voltage >= 170;" ')
topic_select_sql = "select current,voltage,phase from test.meters where voltage >= 10;"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {select_topic} as {topic_select_sql}" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
os.system(f" /usr/bin/taosadapter --version " )
@ -243,37 +245,12 @@ class TDTestCase:
break
consumer.close()
# consumer_dict2 = {
# "group.id": "g2",
# "td.connect.websocket.scheme": "ws",
# "td.connect.user": "root",
# "td.connect.pass": "taosdata",
# "auto.offset.reset": "earliest",
# "enable.auto.commit": "false",
# }
# consumer = taosws.Consumer(consumer_dict2)
# try:
# consumer.subscribe([db_topic,stable_topic])
# except TmqError:
# tdLog.exit(f"subscribe error")
# first_consumer_rows = 0
# while True:
# message = consumer.poll(timeout=1.0)
# if message:
# for block in message:
# first_consumer_rows += block.nrows()
# else:
# tdLog.notice("message is null and break")
# break
# consumer.commit(message)
# tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
# break
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/all_insertmode_alltypes.json -y")
# os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
# add deleted data
@ -285,9 +262,10 @@ class TDTestCase:
if os.system(cmd) == 0:
raise Exception("failed to execute system command. cmd: %s" % cmd)
os.system("pkill taosd") # make sure all the data are saved in disk.
os.system("pkill -9 taosd") # make sure all the data are saved in disk.
os.system("pkill -9 taos")
self.checkProcessPid("taosd")
os.system("pkill taosadapter") # make sure all the data are saved in disk.
os.system("pkill -9 taosadapter") # make sure all the data are saved in disk.
self.checkProcessPid("taosadapter")
tdLog.printNoPrefix("==========step2:update new version ")
@ -309,8 +287,28 @@ class TDTestCase:
tdLog.info(f"New client version is {nowClientVersion}")
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
tdsql.query(f"select last(*) from curdb.meters")
tdLog.info(tdsql.queryResult)
tdsql.query(f"select * from db_all_insert_mode.sml_json")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_line")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_telnet")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.rest")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.stmt")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_rest_json")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_rest_line")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_rest_telnet")
tdsql.checkRows(16)
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
tdsql.checkData(0,0,tableNumbers*recordNumbers1+20)
tdsql.query("show streams;")
tdsql.checkRows(2)
@ -381,9 +379,78 @@ class TDTestCase:
tdLog.exit("The unordered list is not the same as the ordered list.")
# check database test and last
# first check
tdsql.query(f"select last(*) from test.meters group by tbname")
tdLog.info(tdsql.queryResult)
# tdsql.checkRows(tableNumbers)
tdsql.query(f"select last_row(*) from test.meters group by tbname")
tdLog.info(tdsql.queryResult)
# tdsql.checkRows(tableNumbers)
tdsql.query(f"select last_row(*) from test.meters partition by tbname")
tdLog.info(tdsql.queryResult)
# tdsql.checkRows(tableNumbers)
tdsql.query(f"select last(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:39:59.000")
tdsql.checkData(0,1,119)
tdsql.checkData(0,2,191)
tdsql.checkData(0,3,0.25)
tdsql.query(f"select last_row(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:39:59.000")
tdsql.checkData(0,1,119)
tdsql.checkData(0,2,191)
tdsql.checkData(0,3,0.25)
tdsql.query(f"select last(*) from test.d1")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2032-08-14 08:39:59.001")
tdsql.checkData(0,1,11)
tdsql.checkData(0,2,190)
tdsql.checkData(0,3,0.21)
# update data and check
tdsql.execute("insert into test.d2 values ('2033-07-14 08:39:59.002', 139, 182, 1.10) (now+2s, 12, 191, 0.22) test.d2 (ts) values ('2033-07-14 08:39:59.003');")
tdsql.execute("insert into test.d2 values (now+5s, 4.3, 104, 0.4);")
tdsql.query(f"select last(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:39:59.003")
tdsql.checkData(0,1,139)
tdsql.checkData(0,2,182)
tdsql.checkData(0,3,1.10)
# repeately insert data and check
tdsql.execute("insert into test.d1 values (now+1s, 11, 190, 0.21) (now+2s, 12, 191, 0.22) ('2033-07-14 08:40:01.001', 16, 180, 0.53);")
tdsql.query(f"select last(*) from test.d1")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:40:01.001")
tdsql.checkData(0,1,16)
tdsql.checkData(0,2,180)
tdsql.checkData(0,3,0.53)
tdsql.query(f"select last(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:40:01.001")
tdsql.checkData(0,1,16)
tdsql.checkData(0,2,180)
tdsql.checkData(0,3,0.53)
tdsql.query(f"select last_row(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:40:01.001")
tdsql.checkData(0,1,16)
tdsql.checkData(0,2,180)
tdsql.checkData(0,3,0.53)
# check tmq
tdsql.execute("insert into test.d80 values (now+1s, 11, 190, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
conn = taos.connect()
consumer = Consumer(
@ -408,7 +475,7 @@ class TDTestCase:
print("consumer has completed and break")
break
consumer.close()
tdsql.query("select current,voltage,phase from test.meters where voltage >= 170;")
tdsql.query(f"{topic_select_sql}")
all_rows = tdsql.queryRows
if consumer_rows < all_rows - first_consumer_rows :
tdLog.exit(f"consumer rows is {consumer_rows}, less than {all_rows - first_consumer_rows}")

View File

@ -31,8 +31,8 @@ class LegalDataType(Enum):
UINT = 'INT UNSIGNED'
BIGINT = 'BIGINT'
UBIGINT = 'BIGINT UNSIGNED'
VARCHAR = 'VARCHAR(100)'
BINARY = 'BINARY(100)'
VARCHAR = 'VARCHAR(10000)'
BINARY = 'BINARY(10000)'
class TableType(Enum):

View File

@ -22,7 +22,7 @@ class TDTestCase:
self.vgroups = 4
self.ctbNum = 10
self.rowsPerTbl = 10000
self.duraion = '1h'
self.duraion = '1d'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -33,7 +33,7 @@ class TDTestCase:
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s stt_trigger 1"%(dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s"%(dbName))
return
@ -266,11 +266,11 @@ class TDTestCase:
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name),
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name),
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name),
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30d)' % (col_name, col_name, col_name),
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name),
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name),
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name),
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name),
'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name),
@ -317,6 +317,7 @@ class TDTestCase:
def run(self):
self.prepareTestEnv()
tdSql.execute('flush database test')
#time.sleep(99999999)
self.test_sort_for_partition_hint()
self.test_sort_for_partition_res()

View File

@ -87,7 +87,7 @@ class TDTestCase:
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT "
tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('nee.w-', tname)) AS SELECT "
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT "
@ -97,25 +97,25 @@ class TDTestCase:
tdSql.query("select * from sta")
tdSql.checkRows(3)
tdSql.query("select tbname from sta order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.sta_'):
if not tdSql.getData(0, 0).startswith('nee_w-t1_sta_'):
tdLog.exit("error1")
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.sta_'):
if not tdSql.getData(1, 0).startswith('nee_w-t2_sta_'):
tdLog.exit("error2")
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.sta_'):
if not tdSql.getData(2, 0).startswith('nee_w-t3_sta_'):
tdLog.exit("error3")
tdSql.query("select * from stb")
tdSql.checkRows(3)
tdSql.query("select tbname from stb order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.stb_'):
if not tdSql.getData(0, 0).startswith('new-t1_stb_'):
tdLog.exit("error4")
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.stb_'):
if not tdSql.getData(1, 0).startswith('new-t2_stb_'):
tdLog.exit("error5")
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.stb_'):
if not tdSql.getData(2, 0).startswith('new-t3_stb_'):
tdLog.exit("error6")
# run

View File

@ -276,12 +276,16 @@ int32_t simExecuteExpression(SScript *script, char *exp) {
if (op1[0] == '=') {
strcpy(simGetVariable(script, var1 + 1, var1Len - 1), t3);
} else if (op1[0] == '<') {
val0 = atoi(t0);
val1 = atoi(t3);
int64_t val0 = atoll(t0);
int64_t val1 = atoll(t3);
// val0 = atoi(t0);
// val1 = atoi(t3);
if (val0 >= val1) result = -1;
} else if (op1[0] == '>') {
val0 = atoi(t0);
val1 = atoi(t3);
int64_t val0 = atoll(t0);
int64_t val1 = atoll(t3);
// val0 = atoi(t0);
// val1 = atoi(t3);
if (val0 <= val1) result = -1;
}
} else {
@ -381,10 +385,8 @@ bool simExecuteRunBackCmd(SScript *script, char *option) {
void simReplaceDirSep(char *buf) {
#ifdef WINDOWS
int i = 0;
while(buf[i] != '\0')
{
if(buf[i] == '/')
{
while (buf[i] != '\0') {
if (buf[i] == '/') {
buf[i] = '\\';
}
i++;