Merge branch 'develop' into coverity_scan
This commit is contained in:
commit
9a6a7e6d54
|
@ -791,12 +791,12 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
sToken = tStrGetToken(sql, &index, false, 0, NULL);
|
||||
sql += index;
|
||||
|
||||
STagData *pTag = (STagData *)pCmd->payload;
|
||||
tscAllocPayload(pCmd, sizeof(STagData));
|
||||
STagData *pTag = (STagData *) pCmd->payload;
|
||||
|
||||
memset(pTag, 0, sizeof(STagData));
|
||||
|
||||
/*
|
||||
* the source super table is moved to the secondary position of the pTableMetaInfo list
|
||||
*/
|
||||
//the source super table is moved to the secondary position of the pTableMetaInfo list
|
||||
if (pQueryInfo->numOfTables < 2) {
|
||||
tscAddEmptyMetaInfo(pQueryInfo);
|
||||
}
|
||||
|
@ -897,9 +897,8 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
index = 0;
|
||||
sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
|
||||
sql += index;
|
||||
if (sToken.n == 0) {
|
||||
break;
|
||||
} else if (sToken.type == TK_RP) {
|
||||
|
||||
if (sToken.n == 0 || sToken.type == TK_RP) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -913,11 +912,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY || pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) &&
|
||||
sToken.n > pTagSchema[colIndex].bytes) {
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z);
|
||||
}
|
||||
}
|
||||
|
||||
index = 0;
|
||||
|
@ -1041,8 +1035,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
|||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
}
|
||||
|
||||
// TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536, but TSDB_PAYLOAD_SIZE is 65380
|
||||
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE + 2048)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -824,7 +824,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
tscError("%p: fseek failed: %s", pSql, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
if (fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f) != pBlockInfo->compLen) {
|
||||
|
||||
size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
|
||||
if (s != pBlockInfo->compLen) {
|
||||
int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
|
||||
tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
|
||||
return code;
|
||||
|
|
|
@ -308,10 +308,8 @@ void tscSaveSubscriptionProgress(void* sub) {
|
|||
|
||||
char path[256];
|
||||
sprintf(path, "%s/subscribe", tsDataDir);
|
||||
if (access(path, 0) != 0) {
|
||||
if (mkdir(path, 0777) != 0 && errno != EEXIST) {
|
||||
tscError("failed to create subscribe dir: %s", path);
|
||||
}
|
||||
if (tmkdir(path, 0777) != 0) {
|
||||
tscError("failed to create subscribe dir: %s", path);
|
||||
}
|
||||
|
||||
sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic);
|
||||
|
|
|
@ -608,6 +608,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
|||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
// the expanded size when a row data is converted to SDataRow format
|
||||
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
||||
|
||||
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||
|
||||
|
@ -627,7 +630,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pOneTableBlock->size*sizeof(int32_t)*2;
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * MAX_EXPAND_SIZE;
|
||||
|
||||
if (dataBuf->nAllocSize < destSize) {
|
||||
while (dataBuf->nAllocSize < destSize) {
|
||||
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
|
||||
|
@ -648,26 +653,30 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
|||
}
|
||||
}
|
||||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||
tscSortRemoveDataBlockDupRows(pOneTableBlock);
|
||||
|
||||
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
||||
|
||||
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
|
||||
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
|
||||
|
||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
|
||||
|
||||
|
||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + MAX_EXPAND_SIZE);
|
||||
|
||||
pBlocks->tid = htonl(pBlocks->tid);
|
||||
pBlocks->uid = htobe64(pBlocks->uid);
|
||||
pBlocks->sversion = htonl(pBlocks->sversion);
|
||||
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
||||
|
||||
pBlocks->len = htonl(len);
|
||||
|
||||
|
||||
// erase the empty space reserved for binary data
|
||||
len = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
|
||||
dataBuf->size += (len + sizeof(SSubmitBlk));
|
||||
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
|
||||
assert(finalLen <= len);
|
||||
|
||||
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||
|
||||
// the length does not include the SSubmitBlk structure
|
||||
pBlocks->len = htonl(finalLen);
|
||||
|
||||
dataBuf->numOfTables += 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -911,21 +911,27 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
|
|||
|
||||
(void)lseek(fd, 0, SEEK_SET);
|
||||
|
||||
STableRecord tableInfo;
|
||||
//STableRecord tableInfo;
|
||||
char tableName[TSDB_TABLE_NAME_LEN] ;
|
||||
char metricName[TSDB_TABLE_NAME_LEN];
|
||||
while (1) {
|
||||
memset(&tableInfo, 0, sizeof(STableRecord));
|
||||
ssize_t ret;
|
||||
while (1) {
|
||||
//memset(&tableInfo, 0, sizeof(STableRecord));
|
||||
memset(tableName, 0, TSDB_TABLE_NAME_LEN);
|
||||
memset(metricName, 0, TSDB_TABLE_NAME_LEN);
|
||||
ssize_t ret = read(fd, &tableInfo, sizeof(STableRecord));
|
||||
//ssize_t ret = read(fd, &tableInfo, sizeof(STableRecord));
|
||||
//if (ret <= 0) break;
|
||||
ret = read(fd, tableName, TSDB_TABLE_NAME_LEN);
|
||||
if (ret <= 0) break;
|
||||
|
||||
ret = read(fd, metricName, TSDB_TABLE_NAME_LEN);
|
||||
if (ret <= 0) break;
|
||||
|
||||
//tableInfo.name[sizeof(tableInfo.name) - 1] = 0;
|
||||
//tableInfo.metric[sizeof(tableInfo.metric) - 1] = 0;
|
||||
//taosDumpTable(tableInfo.name, tableInfo.metric, arguments, fp);
|
||||
tstrncpy(tableName, tableInfo.name, TSDB_TABLE_NAME_LEN-1);
|
||||
tstrncpy(metricName, tableInfo.metric, TSDB_TABLE_NAME_LEN-1);
|
||||
//tstrncpy(tableName, tableInfo.name, TSDB_TABLE_NAME_LEN-1);
|
||||
//tstrncpy(metricName, tableInfo.metric, TSDB_TABLE_NAME_LEN-1);
|
||||
taosDumpTable(tableName, metricName, arguments, fp);
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,6 @@ typedef struct _SSdbTable {
|
|||
int32_t (*encodeFp)(SSdbOper *pOper);
|
||||
int32_t (*destroyFp)(SSdbOper *pOper);
|
||||
int32_t (*restoredFp)();
|
||||
pthread_mutex_t mutex;
|
||||
} SSdbTable;
|
||||
|
||||
typedef struct {
|
||||
|
@ -429,24 +428,18 @@ static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
|
|||
|
||||
void *sdbGetRow(void *handle, void *key) {
|
||||
SSdbTable *pTable = (SSdbTable *)handle;
|
||||
SSdbRow * pMeta;
|
||||
|
||||
if (handle == NULL) return NULL;
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
keySize = strlen((char *)key);
|
||||
}
|
||||
pMeta = taosHashGet(pTable->iHandle, key, keySize);
|
||||
|
||||
if (pMeta) sdbIncRef(pTable, pMeta->row);
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
if (pMeta == NULL) return NULL;
|
||||
|
||||
return pMeta->row;
|
||||
|
||||
SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize);
|
||||
if (pMeta) {
|
||||
sdbIncRef(pTable, pMeta->row);
|
||||
return pMeta->row;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
|
||||
|
@ -458,8 +451,6 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
rowMeta.rowSize = pOper->rowSize;
|
||||
rowMeta.row = pOper->pObj;
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
|
||||
|
@ -470,16 +461,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow));
|
||||
|
||||
sdbIncRef(pTable, pOper->pObj);
|
||||
pTable->numOfRows++;
|
||||
atomic_add_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj));
|
||||
} else {
|
||||
pTable->autoIndex++;
|
||||
atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
sdbTrace("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
|
||||
sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion());
|
||||
|
||||
|
@ -490,20 +479,15 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
|
||||
(*pTable->deleteFp)(pOper);
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
keySize = strlen((char *)key);
|
||||
}
|
||||
|
||||
taosHashRemove(pTable->iHandle, key, keySize);
|
||||
|
||||
pTable->numOfRows--;
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
atomic_sub_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
|
||||
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
|
||||
|
||||
|
@ -608,14 +592,12 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
*((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
|
||||
*((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
|
||||
// let vgId increase from 2
|
||||
if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) {
|
||||
*((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
|
||||
*((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
}
|
||||
|
||||
int32_t code = sdbInsertHash(pTable, pOper);
|
||||
|
@ -805,8 +787,6 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
|
|||
}
|
||||
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
|
||||
|
||||
pthread_mutex_init(&pTable->mutex, NULL);
|
||||
|
||||
tsSdbObj.numOfTables++;
|
||||
tsSdbObj.tableList[pTable->tableId] = pTable;
|
||||
return pTable;
|
||||
|
@ -835,8 +815,6 @@ void sdbCloseTable(void *handle) {
|
|||
taosHashDestroyIter(pIter);
|
||||
taosHashCleanup(pTable->iHandle);
|
||||
|
||||
pthread_mutex_destroy(&pTable->mutex);
|
||||
|
||||
sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables);
|
||||
free(pTable);
|
||||
}
|
||||
|
|
|
@ -780,6 +780,16 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
|
||||
if (pTable != NULL) {
|
||||
mLPrint("app:%p:%p, stable:%s, create result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
||||
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
|
||||
|
||||
|
@ -819,25 +829,27 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
assert(tschema[col].type >= TSDB_DATA_TYPE_BOOL && tschema[col].type <= TSDB_DATA_TYPE_NCHAR);
|
||||
}
|
||||
|
||||
pMsg->pTable = (STableObj *)pStable;
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = sizeof(SSuperTableObj) + schemaSize,
|
||||
.pMsg = pMsg
|
||||
.pMsg = pMsg,
|
||||
.cb = mnodeCreateSuperTableCb
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mnodeDestroySuperTable(pStable);
|
||||
pMsg->pTable = NULL;
|
||||
mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return code;
|
||||
} else {
|
||||
mLPrint("app:%p:%p, table:%s, is created, tags:%d fields:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
pStable->numOfTags, pStable->numOfColumns);
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
||||
|
@ -1535,10 +1547,16 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
|
|||
}
|
||||
|
||||
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||
if (pTable != NULL) {
|
||||
mTrace("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
|
||||
|
||||
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable);
|
||||
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
|
||||
if (pMDCreate == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
@ -1639,16 +1657,13 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
|
|||
|
||||
int32_t code = sdbInsertRow(&desc);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
free(pTable);
|
||||
mnodeDestroyChildTable(pTable);
|
||||
pMsg->pTable = NULL;
|
||||
mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
|
||||
tstrerror(code));
|
||||
pMsg->pTable = NULL;
|
||||
return code;
|
||||
} else {
|
||||
mTrace("app:%p:%p, table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
|
||||
|
|
|
@ -5153,9 +5153,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
}
|
||||
|
||||
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
|
||||
tfree(*pExpr);
|
||||
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList);
|
||||
|
@ -5227,8 +5225,17 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
|
||||
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
|
||||
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
|
||||
|
||||
return 0;
|
||||
|
||||
_cleanup:
|
||||
tfree(*pExpr);
|
||||
taosArrayDestroy(*pTableIdList);
|
||||
*pTableIdList = NULL;
|
||||
tfree(*tbnameCond);
|
||||
tfree(*groupbyCols);
|
||||
tfree(*tagCols);
|
||||
tfree(*tagCond);
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
||||
static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
|
||||
|
@ -5494,6 +5501,8 @@ static int compareTableIdInfo(const void* a, const void* b) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void freeQInfo(SQInfo *pQInfo);
|
||||
|
||||
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
|
||||
STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) {
|
||||
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
||||
|
@ -5634,22 +5643,27 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
return pQInfo;
|
||||
|
||||
_cleanup:
|
||||
tfree(pQuery->fillVal);
|
||||
//tfree(pQuery->fillVal);
|
||||
|
||||
if (pQuery->sdata != NULL) {
|
||||
for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
|
||||
tfree(pQuery->sdata[col]);
|
||||
}
|
||||
}
|
||||
//if (pQuery->sdata != NULL) {
|
||||
// for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
|
||||
// tfree(pQuery->sdata[col]);
|
||||
// }
|
||||
//}
|
||||
|
||||
tfree(pQuery->sdata);
|
||||
tfree(pQuery->pFilterInfo);
|
||||
tfree(pQuery->colList);
|
||||
//
|
||||
//tfree(pQuery->sdata);
|
||||
//tfree(pQuery->pFilterInfo);
|
||||
//tfree(pQuery->colList);
|
||||
|
||||
tfree(pExprs);
|
||||
tfree(pGroupbyExpr);
|
||||
//tfree(pExprs);
|
||||
//tfree(pGroupbyExpr);
|
||||
|
||||
tfree(pQInfo);
|
||||
//taosArrayDestroy(pQInfo->arrTableIdInfo);
|
||||
//tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
|
||||
//
|
||||
//tfree(pQInfo);
|
||||
freeQInfo(pQInfo);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
@ -5668,7 +5682,6 @@ static bool isValidQInfo(void *param) {
|
|||
return (sig == (uint64_t)pQInfo);
|
||||
}
|
||||
|
||||
static void freeQInfo(SQInfo *pQInfo);
|
||||
|
||||
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -5869,6 +5882,8 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
|||
SSqlFuncMsg **pExprMsg = NULL;
|
||||
SColIndex * pGroupColIndex = NULL;
|
||||
SColumnInfo* pTagColumnInfo = NULL;
|
||||
SExprInfo *pExprs = NULL;
|
||||
SSqlGroupbyExpr *pGroupbyExpr = NULL;
|
||||
|
||||
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo)) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
|
@ -5887,12 +5902,12 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
|||
goto _over;
|
||||
}
|
||||
|
||||
SExprInfo *pExprs = NULL;
|
||||
if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
|
||||
free(pExprMsg);
|
||||
goto _over;
|
||||
}
|
||||
|
||||
SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code);
|
||||
pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code);
|
||||
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
|
@ -5939,6 +5954,10 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
|||
}
|
||||
|
||||
(*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo);
|
||||
pExprs = NULL;
|
||||
pGroupbyExpr = NULL;
|
||||
pTagColumnInfo = NULL;
|
||||
|
||||
if ((*pQInfo) == NULL) {
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _over;
|
||||
|
@ -5947,9 +5966,15 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
|||
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
|
||||
|
||||
_over:
|
||||
tfree(tagCond);
|
||||
tfree(tbnameCond);
|
||||
tfree(pGroupColIndex);
|
||||
free(tagCond);
|
||||
free(tbnameCond);
|
||||
free(pGroupColIndex);
|
||||
if (pGroupbyExpr != NULL) {
|
||||
taosArrayDestroy(pGroupbyExpr->columnInfo);
|
||||
free(pGroupbyExpr);
|
||||
}
|
||||
free(pTagColumnInfo);
|
||||
free(pExprs);
|
||||
taosArrayDestroy(pTableIdList);
|
||||
|
||||
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
|
||||
|
|
|
@ -138,7 +138,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
|||
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
||||
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
||||
tDataTypeDesc[pWindowResInfo->type].nSize);
|
||||
|
||||
assert(p != NULL);
|
||||
int32_t v = (*p - num);
|
||||
assert(v >= 0 && v <= pWindowResInfo->size);
|
||||
taosHashPut(pWindowResInfo->hashList, (char *)&pResult->window.skey, tDataTypeDesc[pWindowResInfo->type].nSize,
|
||||
|
|
|
@ -879,8 +879,8 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
|
|||
UNUSED(ret);
|
||||
|
||||
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) {
|
||||
ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
|
||||
UNUSED(ret);
|
||||
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
|
||||
UNUSED(sz);
|
||||
tMemBucketPut(pMemBucket, pPage->data, pPage->num);
|
||||
}
|
||||
|
||||
|
@ -965,10 +965,11 @@ char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePag
|
|||
*/
|
||||
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0];
|
||||
assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize);
|
||||
|
||||
fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET);
|
||||
size_t ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
|
||||
int32_t ret;
|
||||
ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET);
|
||||
UNUSED(ret);
|
||||
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
|
||||
UNUSED(sz);
|
||||
thisVal = pPage->data;
|
||||
}
|
||||
return thisVal;
|
||||
|
|
|
@ -65,8 +65,10 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
|||
|
||||
// validate the file magic number
|
||||
STSBufFileHeader header = {0};
|
||||
fseek(pTSBuf->f, 0, SEEK_SET);
|
||||
fread(&header, 1, sizeof(STSBufFileHeader), pTSBuf->f);
|
||||
int32_t ret = fseek(pTSBuf->f, 0, SEEK_SET);
|
||||
UNUSED(ret);
|
||||
size_t sz = fread(&header, 1, sizeof(STSBufFileHeader), pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
|
||||
// invalid file
|
||||
if (header.magic != TS_COMP_FILE_MAGIC) {
|
||||
|
@ -97,22 +99,30 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
|||
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
|
||||
|
||||
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
|
||||
if (buf == NULL) {
|
||||
tsBufDestory(pTSBuf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
//int64_t pos = ftell(pTSBuf->f); //pos not used
|
||||
fread(buf, infoSize, 1, pTSBuf->f);
|
||||
sz = fread(buf, infoSize, 1, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
|
||||
// the length value for each vnode is not kept in file, so does not set the length value
|
||||
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) {
|
||||
STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i];
|
||||
memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo));
|
||||
}
|
||||
|
||||
free(buf);
|
||||
|
||||
fseek(pTSBuf->f, 0, SEEK_END);
|
||||
ret = fseek(pTSBuf->f, 0, SEEK_END);
|
||||
UNUSED(ret);
|
||||
|
||||
struct stat fileStat;
|
||||
fstat(fileno(pTSBuf->f), &fileStat);
|
||||
if (fstat(fileno(pTSBuf->f), &fileStat) != 0) {
|
||||
tsBufDestory(pTSBuf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTSBuf->fileSize = (uint32_t)fileStat.st_size;
|
||||
tsBufResetPos(pTSBuf);
|
||||
|
@ -278,19 +288,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
|
|||
* set the right position for the reversed traverse, the reversed traverse is started from
|
||||
* the end of each comp data block
|
||||
*/
|
||||
fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR);
|
||||
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
|
||||
int32_t ret = fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR);
|
||||
size_t sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
|
||||
pBlock->compLen = pBlock->padding;
|
||||
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
|
||||
fseek(pTSBuf->f, -offset, SEEK_CUR);
|
||||
ret = fseek(pTSBuf->f, -offset, SEEK_CUR);
|
||||
UNUSED(ret);
|
||||
}
|
||||
|
||||
fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
|
||||
fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
|
||||
|
||||
fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
|
||||
fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
|
||||
size_t sz = fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
sz = fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
|
||||
if (decomp) {
|
||||
pTSBuf->tsData.len =
|
||||
|
@ -299,12 +314,13 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
|
|||
}
|
||||
|
||||
// read the comp length at the length of comp block
|
||||
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
|
||||
sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
|
||||
// for backwards traverse, set the start position at the end of previous block
|
||||
if (order == TSDB_ORDER_DESC) {
|
||||
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
|
||||
int64_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
|
||||
int32_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
|
||||
UNUSED(r);
|
||||
}
|
||||
|
||||
|
@ -441,7 +457,8 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
|
|||
STSBlock* pBlock = &pTSBuf->block;
|
||||
int32_t compBlockSize =
|
||||
pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
|
||||
fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
|
||||
int32_t ret = fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
|
||||
UNUSED(ret);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -538,7 +555,7 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
|
|||
|
||||
assert(pHeader->tsOrder == TSDB_ORDER_ASC || pHeader->tsOrder == TSDB_ORDER_DESC);
|
||||
|
||||
int64_t r = fseek(pTSBuf->f, 0, SEEK_SET);
|
||||
int32_t r = fseek(pTSBuf->f, 0, SEEK_SET);
|
||||
if (r != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -743,7 +760,9 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
|
|||
int32_t oldSize = pDestBuf->fileSize;
|
||||
|
||||
struct stat fileStat;
|
||||
fstat(fileno(pDestBuf->f), &fileStat);
|
||||
if (fstat(fileno(pDestBuf->f), &fileStat) != 0) {
|
||||
return -1;
|
||||
}
|
||||
pDestBuf->fileSize = (uint32_t)fileStat.st_size;
|
||||
|
||||
assert(pDestBuf->fileSize == oldSize + size);
|
||||
|
@ -766,8 +785,10 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
|
|||
// update prev vnode length info in file
|
||||
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
|
||||
|
||||
fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
|
||||
fwrite((void*)pData, 1, len, pTSBuf->f);
|
||||
int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
|
||||
UNUSED(ret);
|
||||
size_t sz = fwrite((void*)pData, 1, len, pTSBuf->f);
|
||||
UNUSED(sz);
|
||||
pTSBuf->fileSize += len;
|
||||
|
||||
pTSBuf->tsOrder = order;
|
||||
|
|
|
@ -179,7 +179,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
pRsp->affectedRows = htonl(affectedrows);
|
||||
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -648,7 +648,7 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
|
|||
}
|
||||
|
||||
pRepo->config = *pCfg;
|
||||
pRepo->appH = *pAppH;
|
||||
if (pAppH) pRepo->appH = *pAppH;
|
||||
|
||||
pRepo->tsdbMeta = tsdbNewMeta(pCfg);
|
||||
if (pRepo->tsdbMeta == NULL) {
|
||||
|
|
|
@ -788,6 +788,9 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
|||
}
|
||||
|
||||
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, tsdbGetTableSchema(pTable));
|
||||
}
|
||||
|
||||
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
TABLE_TID(pTable), TABLE_UID(pTable));
|
||||
|
|
|
@ -173,9 +173,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
|||
close(pHelper->files.nHeadF.fd);
|
||||
pHelper->files.nHeadF.fd = -1;
|
||||
if (hasError) {
|
||||
remove(pHelper->files.nHeadF.fname);
|
||||
(void)remove(pHelper->files.nHeadF.fname);
|
||||
} else {
|
||||
rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname);
|
||||
if (rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname) < 0) {
|
||||
tsdbError("vgId:%d failed to rename file from %s to %s since %s", REPO_ID(pHelper->pRepo),
|
||||
pHelper->files.nHeadF.fname, pHelper->files.headF.fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
pHelper->files.headF.info = pHelper->files.nHeadF.info;
|
||||
}
|
||||
}
|
||||
|
@ -186,9 +191,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
|||
close(pHelper->files.nLastF.fd);
|
||||
pHelper->files.nLastF.fd = -1;
|
||||
if (hasError) {
|
||||
remove(pHelper->files.nLastF.fname);
|
||||
(void)remove(pHelper->files.nLastF.fname);
|
||||
} else {
|
||||
rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname);
|
||||
if (rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname) < 0) {
|
||||
tsdbError("vgId:%d failed to rename file from %s to %s since %s", REPO_ID(pHelper->pRepo),
|
||||
pHelper->files.nLastF.fname, pHelper->files.lastF.fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
pHelper->files.lastF.info = pHelper->files.nLastF.info;
|
||||
}
|
||||
}
|
||||
|
@ -306,8 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
|||
|
||||
if (pCompBlock->numOfSubBlocks > 1) {
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 &&
|
||||
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
|
||||
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
|
||||
return -1;
|
||||
|
@ -330,14 +339,27 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
|||
}
|
||||
|
||||
int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||
off_t offset = 0;
|
||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
||||
if (pIdx->offset > 0) {
|
||||
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
||||
if (pIdx->offset < 0) return -1;
|
||||
offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
||||
if (offset < 0) {
|
||||
tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
|
||||
strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pIdx->offset = offset;
|
||||
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
|
||||
|
||||
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
|
||||
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) {
|
||||
tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
||||
pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
|
||||
|
@ -345,12 +367,23 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
|||
pHelper->pCompInfo->checksum = 0;
|
||||
ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
|
||||
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
|
||||
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
||||
offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
||||
if (offset < 0) {
|
||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
|
||||
strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
pIdx->offset = offset;
|
||||
pIdx->uid = pHelper->tableInfo.uid;
|
||||
if (pIdx->offset < 0) return -1;
|
||||
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
|
||||
|
||||
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
|
||||
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
|
||||
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
||||
pHelper->files.nHeadF.fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -567,24 +600,24 @@ _err:
|
|||
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
||||
ASSERT(pHelper->files.lastF.fd > 0);
|
||||
struct stat st;
|
||||
fstat(pHelper->files.lastF.fd, &st);
|
||||
if (fstat(pHelper->files.lastF.fd, &st) < 0) return true;
|
||||
if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
||||
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
|
||||
STsdbCfg *pCfg = &(pHelper->pRepo->config);
|
||||
STsdbCfg * pCfg = &(pHelper->pRepo->config);
|
||||
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
|
||||
int64_t offset = 0;
|
||||
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
||||
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
|
||||
|
||||
|
||||
offset = lseek(pFile->fd, 0, SEEK_END);
|
||||
if (offset < 0) {
|
||||
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
||||
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
|
||||
strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
@ -639,9 +672,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
|||
}
|
||||
}
|
||||
|
||||
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
|
||||
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
||||
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
||||
tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
||||
} else {
|
||||
pCompCol->len = tlen;
|
||||
memcpy(tptr, pDataCol->pData, pCompCol->len);
|
||||
|
@ -725,8 +758,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
|
||||
|
||||
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
|
||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock &&
|
||||
blkIdx == pIdx->numOfBlocks - 1);
|
||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks - 1);
|
||||
int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
|
||||
|
||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
|
||||
|
@ -1051,7 +1083,7 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
|||
|
||||
static int tsdbInitHelperFile(SRWHelper *pHelper) {
|
||||
STsdbCfg *pCfg = &pHelper->pRepo->config;
|
||||
size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
|
||||
size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
|
||||
pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
|
||||
if (pHelper->pCompIdx == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
@ -1099,10 +1131,8 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper) {
|
|||
STsdbRepo *pRepo = helperRepo(pHelper);
|
||||
STsdbMeta *pMeta = pHelper->pRepo->tsdbMeta;
|
||||
|
||||
pHelper->pDataCols[0] =
|
||||
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||
pHelper->pDataCols[1] =
|
||||
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||
pHelper->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||
pHelper->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||
if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -1222,12 +1252,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
|
|||
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
|
||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||
|
||||
ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len);
|
||||
|
||||
SCompData *pCompData = (SCompData *)pHelper->pBuffer;
|
||||
|
||||
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
||||
|
||||
pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompBlock->len);
|
||||
if (pHelper->pBuffer == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int fd = pFile->fd;
|
||||
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
|
||||
|
|
|
@ -193,7 +193,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
};
|
||||
|
||||
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||
info.pTableObj->type == TSDB_CHILD_TABLE));
|
||||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||
|
||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||
}
|
||||
|
@ -2236,7 +2236,7 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* p
|
|||
goto _error;
|
||||
}
|
||||
|
||||
assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE);
|
||||
assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE);
|
||||
tsdbRefTable(pTable);
|
||||
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
add_executable(tsdbTests ${SOURCE_LIST})
|
||||
target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb)
|
||||
target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb tutil trpc)
|
||||
|
||||
add_test(NAME unit COMMAND ${CMAKE_CURRENT_BINARY_DIR}/tsdbTests)
|
|
@ -2,9 +2,8 @@
|
|||
#include <stdlib.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "tdataformat.h"
|
||||
#include "tsdb.h"
|
||||
#include "tsdbMain.h"
|
||||
#include "tskiplist.h"
|
||||
|
||||
static double getCurTime() {
|
||||
struct timeval tv;
|
||||
|
@ -77,7 +76,7 @@ static int insertData(SInsertInfo *pInfo) {
|
|||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
pMsg->compressed = htonl(pMsg->numOfBlocks);
|
||||
|
||||
if (tsdbInsertData(pInfo->pRepo, pMsg) < 0) {
|
||||
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) {
|
||||
tfree(pMsg);
|
||||
return -1;
|
||||
}
|
||||
|
@ -90,222 +89,72 @@ static int insertData(SInsertInfo *pInfo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
TEST(TsdbTest, DISABLED_tableEncodeDecode) {
|
||||
// TEST(TsdbTest, tableEncodeDecode) {
|
||||
STable *pTable = (STable *)malloc(sizeof(STable));
|
||||
static void tsdbSetCfg(STsdbCfg *pCfg, int32_t tsdbId, int32_t cacheBlockSize, int32_t totalBlocks, int32_t maxTables,
|
||||
int32_t daysPerFile, int32_t keep, int32_t minRows, int32_t maxRows, int8_t precision,
|
||||
int8_t compression) {
|
||||
pCfg->tsdbId = tsdbId;
|
||||
pCfg->cacheBlockSize = cacheBlockSize;
|
||||
pCfg->totalBlocks = totalBlocks;
|
||||
pCfg->maxTables = maxTables;
|
||||
pCfg->daysPerFile = daysPerFile;
|
||||
pCfg->keep = keep;
|
||||
pCfg->minRowsPerFileBlock = minRows;
|
||||
pCfg->maxRowsPerFileBlock = maxRows;
|
||||
pCfg->precision = precision;
|
||||
pCfg->compression = compression;
|
||||
}
|
||||
|
||||
pTable->type = TSDB_NORMAL_TABLE;
|
||||
pTable->tableId.uid = 987607499877672L;
|
||||
pTable->tableId.tid = 0;
|
||||
pTable->superUid = -1;
|
||||
pTable->sversion = 0;
|
||||
pTable->tagSchema = NULL;
|
||||
pTable->tagVal = NULL;
|
||||
int nCols = 5;
|
||||
STSchema *schema = tdNewSchema(nCols);
|
||||
static void tsdbSetTableCfg(STableCfg *pCfg) {
|
||||
STSchemaBuilder schemaBuilder = {0};
|
||||
|
||||
for (int i = 0; i < nCols; i++) {
|
||||
if (i == 0) {
|
||||
tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
|
||||
} else {
|
||||
tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1);
|
||||
}
|
||||
pCfg->type = TSDB_NORMAL_TABLE;
|
||||
pCfg->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||
pCfg->tableId.tid = 1;
|
||||
pCfg->tableId.uid = 5849583783847394;
|
||||
tdInitTSchemaBuilder(&schemaBuilder, 0);
|
||||
|
||||
int colId = 0;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
tdAddColToSchema(&schemaBuilder, (colId == 0) ? TSDB_DATA_TYPE_TIMESTAMP : TSDB_DATA_TYPE_INT, colId, 0);
|
||||
colId++;
|
||||
}
|
||||
|
||||
pTable->schema = schema;
|
||||
pCfg->schema = tdGetSchemaFromBuilder(&schemaBuilder);
|
||||
pCfg->name = strdup("t1");
|
||||
|
||||
int bufLen = 0;
|
||||
void *buf = tsdbEncodeTable(pTable, &bufLen);
|
||||
|
||||
STable *tTable = tsdbDecodeTable(buf, bufLen);
|
||||
|
||||
ASSERT_EQ(pTable->type, tTable->type);
|
||||
ASSERT_EQ(pTable->tableId.uid, tTable->tableId.uid);
|
||||
ASSERT_EQ(pTable->tableId.tid, tTable->tableId.tid);
|
||||
ASSERT_EQ(pTable->superUid, tTable->superUid);
|
||||
ASSERT_EQ(pTable->sversion, tTable->sversion);
|
||||
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
}
|
||||
|
||||
// TEST(TsdbTest, DISABLED_createRepo) {
|
||||
TEST(TsdbTest, createRepo) {
|
||||
STsdbCfg config;
|
||||
STsdbRepo *repo;
|
||||
TEST(TsdbTest, testInsertSpeed) {
|
||||
int vnode = 1;
|
||||
int ret = 0;
|
||||
STsdbCfg tsdbCfg;
|
||||
STableCfg tableCfg;
|
||||
std::string testDir = "./test";
|
||||
char * rootDir = strdup((testDir + "/vnode" + std::to_string(vnode)).c_str());
|
||||
|
||||
// 1. Create a tsdb repository
|
||||
tsdbSetDefaultCfg(&config);
|
||||
ASSERT_EQ(tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL), 0);
|
||||
tsdbDebugFlag = 131; //NOTE: you must set the flag
|
||||
|
||||
TSDB_REPO_T *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
|
||||
ASSERT_NE(pRepo, nullptr);
|
||||
taosRemoveDir(rootDir);
|
||||
|
||||
// 2. Create a normal table
|
||||
STableCfg tCfg;
|
||||
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1);
|
||||
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0);
|
||||
tsdbTableSetName(&tCfg, "test", false);
|
||||
// Create and open repository
|
||||
tsdbSetCfg(&tsdbCfg, 1, 16, 4, -1, -1, -1, -1, -1, -1, -1);
|
||||
tsdbCreateRepo(rootDir, &tsdbCfg);
|
||||
TSDB_REPO_T *repo = tsdbOpenRepo(rootDir, NULL);
|
||||
ASSERT_NE(repo, nullptr);
|
||||
|
||||
int nCols = 5;
|
||||
STSchema *schema = tdNewSchema(nCols);
|
||||
// Create table
|
||||
tsdbSetTableCfg(&tableCfg);
|
||||
tsdbCreateTable(repo, &tableCfg);
|
||||
|
||||
for (int i = 0; i < nCols; i++) {
|
||||
if (i == 0) {
|
||||
tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
|
||||
} else {
|
||||
tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1);
|
||||
}
|
||||
}
|
||||
// Insert data
|
||||
SInsertInfo iInfo = {repo, true, 1, 5849583783847394, 0, 1590000000000, 10, 10000000, 100, tableCfg.schema};
|
||||
|
||||
tsdbTableSetSchema(&tCfg, schema, true);
|
||||
insertData(&iInfo);
|
||||
|
||||
tsdbCreateTable(pRepo, &tCfg);
|
||||
|
||||
// Insert Some Data
|
||||
SInsertInfo iInfo = {
|
||||
.pRepo = pRepo,
|
||||
// .isAscend = true,
|
||||
.isAscend = false,
|
||||
.tid = tCfg.tableId.tid,
|
||||
.uid = tCfg.tableId.uid,
|
||||
.sversion = tCfg.sversion,
|
||||
.startTime = 1584081000000,
|
||||
.interval = 1000,
|
||||
.totalRows = 10000000,
|
||||
.rowsPerSubmit = 1,
|
||||
.pSchema = schema
|
||||
};
|
||||
|
||||
ASSERT_EQ(insertData(&iInfo), 0);
|
||||
|
||||
// Close the repository
|
||||
tsdbCloseRepo(pRepo);
|
||||
|
||||
// Open the repository again
|
||||
pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
|
||||
repo = (STsdbRepo *)pRepo;
|
||||
ASSERT_NE(pRepo, nullptr);
|
||||
|
||||
// // Insert more data
|
||||
// iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows;
|
||||
// iInfo.totalRows = 10;
|
||||
// iInfo.pRepo = pRepo;
|
||||
// ASSERT_EQ(insertData(&iInfo), 0);
|
||||
|
||||
// // Close the repository
|
||||
// tsdbCloseRepo(pRepo);
|
||||
|
||||
// // Open the repository again
|
||||
// pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
|
||||
// repo = (STsdbRepo *)pRepo;
|
||||
// ASSERT_NE(pRepo, nullptr);
|
||||
|
||||
// // Read from file
|
||||
// SRWHelper rhelper;
|
||||
// tsdbInitReadHelper(&rhelper, repo);
|
||||
|
||||
// SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833);
|
||||
// ASSERT_NE(pFGroup, nullptr);
|
||||
// ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0);
|
||||
|
||||
// STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid);
|
||||
// ASSERT_NE(pTable, nullptr);
|
||||
// tsdbSetHelperTable(&rhelper, pTable, repo);
|
||||
|
||||
// ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
|
||||
// ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0);
|
||||
|
||||
int k = 0;
|
||||
}
|
||||
|
||||
TEST(TsdbTest, DISABLED_openRepo) {
|
||||
// TEST(TsdbTest, openRepo) {
|
||||
// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
|
||||
// ASSERT_NE(repo, nullptr);
|
||||
|
||||
// STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
||||
// SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655);
|
||||
|
||||
// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
// tsdbOpenFile(&pGroup->files[type], O_RDONLY);
|
||||
// }
|
||||
|
||||
// SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx));
|
||||
// tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables);
|
||||
|
||||
// SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
|
||||
|
||||
// tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo);
|
||||
|
||||
// int blockIdx = 0;
|
||||
// SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]);
|
||||
|
||||
// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
|
||||
|
||||
// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
|
||||
|
||||
// STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
|
||||
// SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5);
|
||||
// tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable));
|
||||
|
||||
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
|
||||
|
||||
// tdResetDataCols(pDataCols);
|
||||
|
||||
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData);
|
||||
|
||||
|
||||
// int k = 0;
|
||||
|
||||
}
|
||||
|
||||
TEST(TsdbTest, DISABLED_createFileGroup) {
|
||||
SFileGroup fGroup;
|
||||
|
||||
// ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);
|
||||
|
||||
int k = 0;
|
||||
tsdbCloseRepo(repo, 1);
|
||||
}
|
||||
|
||||
static char *getTKey(const void *data) {
|
||||
return (char *)data;
|
||||
}
|
||||
|
||||
static void insertSkipList(bool isAscend) {
|
||||
TSKEY start_time = 1587393453000;
|
||||
TSKEY interval = 1000;
|
||||
|
||||
SSkipList *pList = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(TSKEY), 0, 0, 1, getTKey);
|
||||
ASSERT_NE(pList, nullptr);
|
||||
|
||||
for (size_t i = 0; i < 20000000; i++)
|
||||
{
|
||||
TSKEY time = isAscend ? (start_time + i * interval) : (start_time - i * interval);
|
||||
int32_t level = 0;
|
||||
int32_t headSize = 0;
|
||||
|
||||
tSkipListNewNodeInfo(pList, &level, &headSize);
|
||||
SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(TSKEY));
|
||||
ASSERT_NE(pNode, nullptr);
|
||||
pNode->level = level;
|
||||
*(TSKEY *)((char *)pNode + headSize) = time;
|
||||
tSkipListPut(pList, pNode);
|
||||
}
|
||||
|
||||
tSkipListDestroy(pList);
|
||||
}
|
||||
|
||||
TEST(TsdbTest, DISABLED_testSkipList) {
|
||||
// TEST(TsdbTest, testSkipList) {
|
||||
double stime = getCurTime();
|
||||
insertSkipList(true);
|
||||
double etime = getCurTime();
|
||||
|
||||
printf("Time used to insert 100000000 records takes %f seconds\n", etime-stime);
|
||||
|
||||
stime = getCurTime();
|
||||
insertSkipList(false);
|
||||
etime = getCurTime();
|
||||
|
||||
printf("Time used to insert 100000000 records takes %f seconds\n", etime-stime);
|
||||
}
|
|
@ -468,7 +468,8 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
|||
return;
|
||||
}
|
||||
|
||||
void *pNewEntry = realloc(pHashObj->hashList, POINTER_BYTES * newSize);
|
||||
int32_t pointerSize = POINTER_BYTES;
|
||||
void *pNewEntry = realloc(pHashObj->hashList, pointerSize * newSize);
|
||||
if (pNewEntry == NULL) {// todo handle error
|
||||
// uTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
|
||||
return;
|
||||
|
|
|
@ -78,8 +78,8 @@ int tdCreateKVStore(char *fname) {
|
|||
return 0;
|
||||
|
||||
_err:
|
||||
if (fd > 0) close(fd);
|
||||
remove(fname);
|
||||
if (fd >= 0) close(fd);
|
||||
(void)remove(fname);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -106,15 +106,15 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (access(pStore->fsnap, F_OK) == 0) { // .snap file exists
|
||||
uTrace("file %s exists, try to recover the KV store", pStore->fsnap);
|
||||
pStore->sfd = open(pStore->fsnap, O_RDONLY);
|
||||
if (pStore->sfd < 0) {
|
||||
pStore->sfd = open(pStore->fsnap, O_RDONLY);
|
||||
if (pStore->sfd < 0) {
|
||||
if (errno != ENOENT) {
|
||||
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
} else {
|
||||
uTrace("file %s exists, try to recover the KV store", pStore->fsnap);
|
||||
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) {
|
||||
if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err;
|
||||
} else {
|
||||
|
@ -133,7 +133,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
|
|||
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
remove(pStore->fsnap);
|
||||
(void)remove(pStore->fsnap);
|
||||
}
|
||||
|
||||
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
|
||||
|
@ -212,7 +212,7 @@ _err:
|
|||
if (pStore->sfd > 0) {
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
remove(pStore->fsnap);
|
||||
(void)remove(pStore->fsnap);
|
||||
}
|
||||
if (pStore->fd > 0) {
|
||||
close(pStore->fd);
|
||||
|
@ -314,7 +314,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
|
|||
}
|
||||
pStore->fd = -1;
|
||||
|
||||
remove(pStore->fsnap);
|
||||
(void)remove(pStore->fsnap);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -56,8 +56,13 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
|
|||
year -= 1;
|
||||
}
|
||||
|
||||
int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + (int64_t)(367*mon)/12 + day) +
|
||||
year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
|
||||
//int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) +
|
||||
// year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
|
||||
int64_t res;
|
||||
res = 367*((int64_t)mon)/12;
|
||||
res += year/4 - year/100 + year/400 + day + year*365 - 719499;
|
||||
res = res*24;
|
||||
res = ((res + hour) * 60 + min) * 60 + sec;
|
||||
|
||||
return (res + timezone);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,211 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 2
|
||||
system sh/cfg.sh -n dnode2 -c walLevel -v 2
|
||||
system sh/cfg.sh -n dnode3 -c walLevel -v 2
|
||||
system sh/cfg.sh -n dnode4 -c walLevel -v 2
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
|
||||
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
|
||||
system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
|
||||
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c alternativeRole -v 1
|
||||
system sh/cfg.sh -n dnode2 -c alternativeRole -v 2
|
||||
system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
|
||||
system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
|
||||
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4
|
||||
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4
|
||||
system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
|
||||
|
||||
print ============== step0: start tarbitrator
|
||||
system sh/exec_tarbitrator.sh -s start
|
||||
|
||||
print ============== step1: start dnode1, only deploy mnode
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 3000
|
||||
sql connect
|
||||
|
||||
print ============== step2: start dnode2/dnode3/dnode4 and add into cluster , then create database with replica 3, and create table, insert data
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
sql create dnode $hostname2
|
||||
sql create dnode $hostname3
|
||||
sql create dnode $hostname4
|
||||
sleep 3000
|
||||
|
||||
$totalTableNum = 20
|
||||
$sleepTimer = 3000
|
||||
|
||||
$db = db
|
||||
print create database $db replica 3 maxTables $totalTableNum
|
||||
sql create database $db replica 3 maxTables $totalTableNum
|
||||
sql use $db
|
||||
|
||||
# create table , insert data
|
||||
$stb = stb
|
||||
sql create table $stb (ts timestamp, c1 int) tags(t1 int)
|
||||
$rowNum = 500
|
||||
$tblNum = 10
|
||||
$totalRows = 0
|
||||
$tsStart = 1420041600000
|
||||
$tsEnd = 0
|
||||
|
||||
$i = 0
|
||||
while $i < $tblNum
|
||||
$tb = tb . $i
|
||||
sql create table $tb using $stb tags( $i )
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$ts = $tsStart + $x
|
||||
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x )
|
||||
$x = $x + 60
|
||||
endw
|
||||
$totalRows = $totalRows + $x
|
||||
print info: inserted $x rows into $tb and totalRows: $totalRows
|
||||
$i = $i + 1
|
||||
endw
|
||||
$tsEnd = $tsStart + $totalRows / $tblNum
|
||||
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step3: stop dnode4
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
sleep $sleepTimer
|
||||
$cnt = 0
|
||||
wait_dnode4_offline_0:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
if $rows != 4 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_offline_0
|
||||
endi
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
|
||||
#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
|
||||
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
|
||||
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
|
||||
#$dnode1Status = $data4_1
|
||||
$dnode2Status = $data4_2
|
||||
$dnode3Status = $data4_3
|
||||
$dnode4Status = $data4_4
|
||||
#$dnode5Status = $data4_5
|
||||
|
||||
if $dnode4Status != offline then
|
||||
sleep 2000
|
||||
goto wait_dnode4_offline_0
|
||||
endi
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_offline:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != offline then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
if $dnode3Vtatus != master then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
|
||||
print ============== step4: insert more data rows
|
||||
$tsStart = $tsEnd + 1000
|
||||
$i = 0
|
||||
while $i < $tblNum
|
||||
$tb = tb . $i
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$ts = $tsStart + $x
|
||||
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x )
|
||||
$x = $x + 60
|
||||
endw
|
||||
$totalRows = $totalRows + $x
|
||||
print info: inserted $x rows into $tb and totalRows: $totalRows
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
sql select count(*) from $stb
|
||||
print data00:$data00 totalRows:$totalRows
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step5: restart dnode4, while alter table and insert data in other thead when dnode4 is syncing
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
run_back unique/arbitrator/sync_replica_createTable_background_add.sim
|
||||
|
||||
print ============== step6: check result
|
||||
#in background.sim, add 10 tables and insert 100 rows
|
||||
$totalRows = $totalRows + 100
|
||||
|
||||
$cnt = 0
|
||||
wait_table_created:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show tables
|
||||
if $rows != $totalTableNum then
|
||||
print rows:$rows totalTableNum:$totalTableNum
|
||||
sleep 2000
|
||||
goto wait_table_created
|
||||
endi
|
||||
|
||||
sql select count(*) from $stb
|
||||
if $data00 != $totalRows then
|
||||
print data00:$data00 totalRows:$totalRows
|
||||
sleep 2000
|
||||
goto wait_table_created
|
||||
endi
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,484 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 2
|
||||
system sh/cfg.sh -n dnode2 -c walLevel -v 2
|
||||
system sh/cfg.sh -n dnode3 -c walLevel -v 2
|
||||
system sh/cfg.sh -n dnode4 -c walLevel -v 2
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
|
||||
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
|
||||
system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
|
||||
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c alternativeRole -v 1
|
||||
system sh/cfg.sh -n dnode2 -c alternativeRole -v 2
|
||||
system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
|
||||
system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
|
||||
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4
|
||||
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4
|
||||
system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
|
||||
|
||||
print ============== step0: start tarbitrator
|
||||
system sh/exec_tarbitrator.sh -s start
|
||||
|
||||
print ============== step1: start dnode1, only deploy mnode
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 3000
|
||||
sql connect
|
||||
|
||||
print ============== step2: start dnode2/dnode3/dnode4 and add into cluster , then create database with replica 3, and create table, insert data
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
sql create dnode $hostname2
|
||||
sql create dnode $hostname3
|
||||
sql create dnode $hostname4
|
||||
sleep 3000
|
||||
|
||||
$totalTableNum = 20
|
||||
$sleepTimer = 3000
|
||||
|
||||
$db = db
|
||||
print create database $db replica 3 maxTables $totalTableNum
|
||||
sql create database $db replica 3 maxTables $totalTableNum
|
||||
sql use $db
|
||||
|
||||
# create table , insert data
|
||||
$stb = stb
|
||||
sql create table $stb (ts timestamp, c1 int) tags(t1 int)
|
||||
$rowNum = 500
|
||||
$tblNum = 10
|
||||
$totalRows = 0
|
||||
$tsStart = 1420041600000
|
||||
$tsEnd = 0
|
||||
|
||||
$i = 0
|
||||
while $i < $tblNum
|
||||
$tb = tb . $i
|
||||
sql create table $tb using $stb tags( $i )
|
||||
|
||||
$x = 0
|
||||
$ts = $tsStart + $x
|
||||
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
$x = $x + 10
|
||||
|
||||
$totalRows = $totalRows + $x
|
||||
print info: inserted $x rows into $tb and totalRows: $totalRows
|
||||
$i = $i + 1
|
||||
endw
|
||||
$tsEnd = $tsStart + $totalRows / $tblNum
|
||||
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step3: stop dnode4
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
$cnt = 0
|
||||
wait_dnode4_offline_0:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
if $rows != 4 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_offline_0
|
||||
endi
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
|
||||
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
|
||||
$dnode1Status = $data4_1
|
||||
$dnode2Status = $data4_2
|
||||
$dnode3Status = $data4_3
|
||||
$dnode4Status = $data4_4
|
||||
|
||||
if $dnode4Status != offline then
|
||||
sleep 2000
|
||||
goto wait_dnode4_offline_0
|
||||
endi
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_offline:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != offline then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
if $dnode3Vtatus != master then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
|
||||
print ============== step4: drop some tables
|
||||
sql drop table tb0
|
||||
sql drop table tb9
|
||||
sql drop table tb1
|
||||
sql drop table tb8
|
||||
|
||||
$totalRows = $totalRows - 40
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step5: restart dnode4, waiting sync end
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_ready:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
if $rows != 4 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_ready
|
||||
endi
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
|
||||
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
|
||||
$dnode1Status = $data4_1
|
||||
$dnode2Status = $data4_2
|
||||
$dnode3Status = $data4_3
|
||||
$dnode4Status = $data4_4
|
||||
|
||||
if $dnode4Status != ready then
|
||||
sleep 2000
|
||||
goto wait_dnode4_ready
|
||||
endi
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_slave:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_slave
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != slave then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_slave
|
||||
endi
|
||||
|
||||
print ============== step6: stop dnode2/dnode3 and remove their data dir, and restart dnode2/dnode3
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
sleep 1000
|
||||
system rm -rf ../../../sim/dnode2/data
|
||||
system rm -rf ../../../sim/dnode3/data
|
||||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_master:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_master
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != master then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_master
|
||||
endi
|
||||
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
print ============== step7: stop dnode4 and create some new tables
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
|
||||
$ts = $tsStart
|
||||
sql insert into tb10 using stb tags(10) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb11 using stb tags(11) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb12 using stb tags(12) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb13 using stb tags(13) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
|
||||
$totalRows = $totalRows + 40
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step8: restart dnode4, waiting sync end
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_ready_2:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
if $rows != 4 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_ready_2
|
||||
endi
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
|
||||
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
|
||||
$dnode1Status = $data4_1
|
||||
$dnode2Status = $data4_2
|
||||
$dnode3Status = $data4_3
|
||||
$dnode4Status = $data4_4
|
||||
|
||||
if $dnode4Status != ready then
|
||||
sleep 2000
|
||||
goto wait_dnode4_ready_2
|
||||
endi
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_slave_2:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_slave_2
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != slave then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_slave_2
|
||||
endi
|
||||
|
||||
print ============== step9: stop dnode2/dnode3 and remove their data dir, and restart dnode2/dnode3
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
sleep 1000
|
||||
system rm -rf ../../../sim/dnode2/data
|
||||
system rm -rf ../../../sim/dnode3/data
|
||||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_master_2:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_master_2
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != master then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_master_2
|
||||
endi
|
||||
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
print ============== step10: stop dnode4 and alter table column
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
|
||||
sql alter table stb add column c2 int
|
||||
sql alter table stb add column c3 int
|
||||
sql alter table stb add column c4 int
|
||||
sql alter table stb drop column c1
|
||||
|
||||
sql alter table stb add tag t2 int
|
||||
sql alter table stb add tag t3 int
|
||||
sql alter table stb add tag t4 int
|
||||
sql_error alter table stb drop tag t1
|
||||
|
||||
$ts = $tsEnd + 10000
|
||||
sql insert into tb2 values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb3 values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb4 values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb5 values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
|
||||
|
||||
$ts = $tsStart
|
||||
sql insert into tb14 using stb tags(14) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb15 using stb tags(15) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb16 using stb tags(16) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
sql insert into tb17 using stb tags(17) values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
|
||||
$totalRows = $totalRows + 80
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step11: restart dnode4, waiting sync end
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_ready_3:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
if $rows != 4 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_ready_3
|
||||
endi
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
|
||||
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
|
||||
$dnode1Status = $data4_1
|
||||
$dnode2Status = $data4_2
|
||||
$dnode3Status = $data4_3
|
||||
$dnode4Status = $data4_4
|
||||
|
||||
if $dnode4Status != ready then
|
||||
sleep 2000
|
||||
goto wait_dnode4_ready_3
|
||||
endi
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_slave_3:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_slave_3
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != slave then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_slave_3
|
||||
endi
|
||||
|
||||
print ============== step12: stop dnode2/dnode3 and remove their data dir, and restart dnode2/dnode3
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
sleep 1000
|
||||
system rm -rf ../../../sim/dnode2/data
|
||||
system rm -rf ../../../sim/dnode3/data
|
||||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
$cnt = 0
|
||||
wait_dnode4_vgroup_master_3:
|
||||
$cnt = $cnt + 1
|
||||
if $cnt == 10 then
|
||||
return -1
|
||||
endi
|
||||
sql show vgroups
|
||||
if $rows != 1 then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_master_3
|
||||
endi
|
||||
print show vgroups:
|
||||
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
|
||||
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
|
||||
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
|
||||
$dnode4Vtatus = $data4_2
|
||||
$dnode3Vtatus = $data7_2
|
||||
|
||||
if $dnode4Vtatus != master then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_master_3
|
||||
endi
|
||||
|
||||
sql select count(*) from $stb
|
||||
print data00 $data00
|
||||
if $data00 != $totalRows then
|
||||
return -1
|
||||
endi
|
|
@ -0,0 +1,32 @@
|
|||
sql connect
|
||||
|
||||
$db = db
|
||||
$stb = stb
|
||||
print =============== sync_replica_createTable_background_add.sim step0: create table and insert data
|
||||
$totalTableNum = 10
|
||||
|
||||
sql use $db
|
||||
|
||||
#sql create table $stb (ts timestamp, c1 int) tags(t1 int)
|
||||
# create table , insert data
|
||||
#$rowNum = 500
|
||||
$tblNum = 20
|
||||
$totalRows = 100
|
||||
$tsStart = 1420141600000
|
||||
$tsEnd = 0
|
||||
|
||||
$i = 10
|
||||
while $i < $tblNum
|
||||
$tb = tb . $i
|
||||
sql create table $tb using $stb tags( $i )
|
||||
|
||||
$x = 10
|
||||
$ts = $tsStart + $x
|
||||
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x )
|
||||
|
||||
$totalRows = $totalRows + $x
|
||||
print info: inserted $x rows into $tb and totalRows: $totalRows
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
|
|
@ -31,6 +31,8 @@ run unique/arbitrator/offline_replica3_dropTable_online.sim
|
|||
run unique/arbitrator/replica_changeWithArbitrator.sim
|
||||
run unique/arbitrator/sync_replica2_alterTable_add.sim
|
||||
run unique/arbitrator/sync_replica2_alterTable_drop.sim
|
||||
run unique/arbitrator/sync_replica3_createTable.sim
|
||||
run unique/arbitrator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim
|
||||
run unique/arbitrator/sync_replica2_dropDb.sim
|
||||
run unique/arbitrator/sync_replica2_dropTable.sim
|
||||
run unique/arbitrator/sync_replica3_alterTable_add.sim
|
||||
|
|
|
@ -75,7 +75,7 @@ endi
|
|||
|
||||
system_content curl -u root:taosdata -d '[{"metric": "ab1234567890123456789012345678ab1234567890123456789012345678","timestamp": 1346846400,"value": 18,"tags": {"host": "web01","group1": "1","dc": "lga"}}]' 127.0.0.1:6020/opentsdb/db/put
|
||||
print $system_content
|
||||
if $system_content != @{"errors":[{"datapoint":{"metric":"ab1234567890123456789012345678ab1234567890123456789012345678","stable":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb","table":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb_lga_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","host":"web01"},"status":"error","code":-2147482101}}],"failed":1,"success":0,"affected_rows":0}@ then
|
||||
if $system_content != @{"errors":[{"datapoint":{"metric":"ab1234567890123456789012345678ab1234567890123456789012345678","stable":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb","table":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb_lga_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","host":"web01"},"status":"error","code":-2147482101,"desc":"tsdb timestamp is out of range"}}],"failed":1,"success":0,"affected_rows":0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -125,7 +125,7 @@ endi
|
|||
|
||||
system_content curl -u root:taosdata -d '[{"metric": "sys_cpu","timestamp": 1346846400,"value": 18,"tags": {"host": "web01","group1": "1","group1": "1","group1": "1","group1": "1","group1": "1","dc": "lga"}}]' 127.0.0.1:6020/opentsdb/db/put
|
||||
print $system_content
|
||||
if $system_content != @{"errors":[{"datapoint":{"metric":"sys_cpu","stable":"sys_cpu_d_bbbbbbb","table":"sys_cpu_d_bbbbbbb_lga_1_1_1_1_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","group1":"1","group1":"1","group1":"1","group1":"1","host":"web01"},"status":"error","code":-2147482782}}],"failed":1,"success":0,"affected_rows":0}@ then
|
||||
if $system_content != @{"errors":[{"datapoint":{"metric":"sys_cpu","stable":"sys_cpu_d_bbbbbbb","table":"sys_cpu_d_bbbbbbb_lga_1_1_1_1_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","group1":"1","group1":"1","group1":"1","group1":"1","host":"web01"},"status":"error","code":-2147482782,"desc":"failed to create table"}}],"failed":1,"success":0,"affected_rows":0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ void createDbAndTable() {
|
|||
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
@ -114,7 +114,7 @@ void createDbAndTable() {
|
|||
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
|
||||
|
@ -124,7 +124,7 @@ void createDbAndTable() {
|
|||
pError("failed to create table %s%" PRId64 ", reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
} else {
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
|
@ -140,7 +140,7 @@ void createDbAndTable() {
|
|||
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,6 +148,7 @@ void createDbAndTable() {
|
|||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
float seconds = (et - st) / 1000.0 / 1000.0;
|
||||
pPrint("%.1f seconds to create %ld tables, speed:%.1f", seconds, totalTables, totalTables / seconds);
|
||||
taos_close(con);
|
||||
}
|
||||
|
||||
void insertData() {
|
||||
|
@ -257,7 +258,7 @@ void *syncTest(void *param) {
|
|||
pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName,
|
||||
table, row, taos_errstr(con));
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
|
||||
// "insert into"
|
||||
len = sprintf(sql, "%s", inserStr);
|
||||
|
|
Loading…
Reference in New Issue