Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/vnode_compact

This commit is contained in:
Hongze Cheng 2023-01-05 13:57:55 +08:00
commit a046dbb02b
49 changed files with 1138 additions and 286 deletions

View File

@ -220,6 +220,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo);
DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId);
DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *table[], int tableNum, int *vgId);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);

View File

@ -210,6 +210,9 @@ int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STabl
int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetTablesHashVgId(SCatalog* pCtg, SRequestConnInfo* pConn, int32_t acctId, const char* pDb, const char* pTableName[],
int32_t tableNum, int32_t *vgId);
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta);

View File

@ -216,6 +216,7 @@ int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
#ifdef __cplusplus
}

View File

@ -116,6 +116,7 @@ extern "C" {
#include "osTimer.h"
#include "osTimezone.h"
#include "taoserror.h"
#include "tlog.h"
#ifdef __cplusplus
}

View File

@ -182,6 +182,7 @@ typedef struct {
int8_t offset[OTD_JSON_FIELDS_NUM];
SSmlLineInfo *lines; // element is SSmlLineInfo
bool parseJsonByLib;
SArray *tagJsonArray;
//
SArray *preLineTagKV;
@ -230,6 +231,7 @@ int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32
int32_t smlClearForRerun(SSmlHandle *info);
int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg);
uint8_t smlGetTimestampLen(int64_t num);
void clearColValArray(SArray* pCols);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);

View File

@ -802,7 +802,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
//pWrapper->pCatalogReq->forceUpdate = false;
// pWrapper->pCatalogReq->forceUpdate = false;
code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
}
@ -831,8 +831,8 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest)
tstrerror(code), pWrapper->pRequest->requestId);
destorySqlCallbackWrapper(pWrapper);
terrno = code;
pWrapper->pRequest->code = code;
pWrapper->pRequest->body.queryFp(pWrapper->pRequest->body.param, pWrapper->pRequest, code);
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
}
@ -1167,6 +1167,54 @@ _return:
return code;
}
int taos_get_tables_vgId(TAOS *taos, const char *db, const char *table[], int tableNum, int *vgId) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
if (NULL == db || NULL == table || NULL == vgId || tableNum <= 0) {
tscError("invalid input param, db:%p, table:%p, vgId:%p, tbNum:%d", db, table, vgId, tableNum);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}
int64_t connId = *(int64_t *)taos;
SRequestObj *pRequest = NULL;
char *sql = "taos_get_table_vgId";
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
return terrno;
}
pRequest->syncQuery = true;
STscObj *pTscObj = pRequest->pTscObj;
SCatalog *pCtg = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
if (code != TSDB_CODE_SUCCESS) {
goto _return;
}
SRequestConnInfo conn = {
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogGetTablesHashVgId(pCtg, &conn, pTscObj->acctId, db, table, tableNum, vgId);
if (code) {
goto _return;
}
_return:
terrno = code;
destroyRequest(pRequest);
return code;
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
@ -1331,8 +1379,7 @@ int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fiel
}
// let stmt to reclaim TAOS_FIELD_E that was allocated by `taos_stmt_get_tag_fields`/`taos_stmt_get_col_fields`
void taos_stmt_reclaim_fields(TAOS_STMT *stmt, TAOS_FIELD_E *fields)
{
void taos_stmt_reclaim_fields(TAOS_STMT *stmt, TAOS_FIELD_E *fields) {
(void)stmt;
if (!fields) return;
taosMemoryFree(fields);

View File

@ -1024,6 +1024,16 @@ static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
taosMemoryFree(tag);
}
void clearColValArray(SArray* pCols) {
int32_t num = taosArrayGetSize(pCols);
for (int32_t i = 0; i < num; ++i) {
SColVal* pCol = taosArrayGet(pCols, i);
if (TSDB_DATA_TYPE_NCHAR == pCol->type) {
taosMemoryFreeClear(pCol->value.pData);
}
}
}
void smlDestroyInfo(SSmlHandle *info) {
if (!info) return;
qDestroyQuery(info->pQuery);
@ -1053,6 +1063,12 @@ void smlDestroyInfo(SSmlHandle *info) {
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
for(int i = 0; i< taosArrayGetSize(info->tagJsonArray); i++){
cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i);
cJSON_Delete(tags);
}
taosArrayDestroy(info->tagJsonArray);
taosArrayDestroy(info->preLineTagKV);
taosArrayDestroy(info->maxTagKVs);
taosArrayDestroy(info->preLineColKV);
@ -1067,6 +1083,7 @@ void smlDestroyInfo(SSmlHandle *info) {
taosMemoryFree(info->lines);
}
cJSON_Delete(info->root);
taosMemoryFreeClear(info);
}
@ -1090,6 +1107,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->pQuery = smlInitHandle();
info->dataFormat = true;
info->tagJsonArray = taosArrayInit(8, POINTER_BYTES);
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
info->maxTagKVs = taosArrayInit(8, sizeof(SSmlKv));
info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));

View File

@ -335,6 +335,9 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){
(*start)++;
}
}
if(*(*start) == '\0'){
break;
}
if(*(*start) == '}'){
(*start)++;
break;
@ -655,14 +658,14 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
@ -923,9 +926,6 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
cJSON *tsJson = NULL;
cJSON *valueJson = NULL;
cJSON *tagsJson = NULL;
char* rootStr = cJSON_PrintUnformatted(root);
uError("rootStr:%s", rootStr);
taosMemoryFree(rootStr);
int32_t size = cJSON_GetArraySize(root);
// outmost json fields has to be exactly 4
@ -956,6 +956,7 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
}
// Parse tags
bool needFree = info->dataFormat;
elements->tags = cJSON_PrintUnformatted(tagsJson);
elements->tagsLen = strlen(elements->tags);
if(is_same_child_table_telnet(elements, &info->preLine) != 0) {
@ -968,7 +969,7 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
}
}
if(info->dataFormat){
if(needFree){
taosMemoryFree(elements->tags);
elements->tags = NULL;
}
@ -994,6 +995,7 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArray(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
@ -1095,6 +1097,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
if(unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (elements->colsLen == 0 || smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
uError("SML:cols invalidate:%s", elements->cols);
@ -1112,8 +1119,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
return TSDB_CODE_TSC_INVALID_JSON;
}
taosArrayPush(info->tagJsonArray, &tagsJson);
ret = smlParseTagsFromJSON(info, tagsJson, elements);
cJSON_free(tagsJson);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
@ -1141,6 +1148,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArray(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;

View File

@ -151,14 +151,14 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
@ -353,14 +353,14 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
@ -646,6 +646,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
if(info->dataFormat){
smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
smlBuildRow(info->currTableDataCtx);
clearColValArray(info->currTableDataCtx->pValues);
}else{
taosArraySet(elements->colArray, 0, &kv);
}

View File

@ -86,14 +86,14 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
@ -324,6 +324,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArray(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;

View File

@ -90,10 +90,8 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
memset(pName->dbname, 0, TSDB_DB_NAME_LEN);
strncpy(pName->dbname, pDbName, TSDB_DB_NAME_LEN - 1);
memset(pName->tname, 0, TSDB_TABLE_NAME_LEN);
strncpy(pName->tname, pTableName, TSDB_TABLE_NAME_LEN - 1);
snprintf(pName->dbname, sizeof(pName->dbname), "%s", pDbName);
snprintf(pName->tname, sizeof(pName->tname), "%s", pTableName);
return pName;
}
@ -316,7 +314,7 @@ static int compareKv(const void* p1, const void* p2) {
void buildChildTableName(RandTableName* rName) {
SStringBuilder sb = {0};
taosStringBuilderAppendStringLen(&sb, rName->stbFullName, rName->stbFullNameLen);
if(sb.buf == NULL) return;
if (sb.buf == NULL) return;
taosArraySort(rName->tags, compareKv);
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
taosStringBuilderAppendChar(&sb, ',');

View File

@ -54,7 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode* pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode);
int32_t vnodeStart(SVnode *pVnode);
@ -175,7 +175,8 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
void tsdbReleaseDataBlock(STsdbReader *pReader);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
@ -185,7 +186,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader, const char* idstr);
uint64_t suid, void **pReader, const char *idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);

View File

@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter;
typedef struct SQueryNode SQueryNode;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
typedef struct SDiskDataBuilder SDiskDataBuilder;
@ -210,11 +211,13 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
uint8_t **ppBuf);
// tsdbMemTable ==============================================================================================
// SMemTable
typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle);
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
void tsdbRefMemTable(SMemTable *pMemTable);
void tsdbUnrefMemTable(SMemTable *pMemTable);
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
@ -292,8 +295,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap, const char *id);
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap, const char *id);
int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap);
// tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb);
@ -368,11 +371,20 @@ struct STbData {
STbData *next;
};
struct SQueryNode {
SQueryNode *pNext;
SQueryNode **ppNext;
void *pQHandle;
_tsdb_reseek_func_t reseek;
};
struct SMemTable {
SRWLatch latch;
STsdb *pTsdb;
SVBufPool *pPool;
volatile int32_t nRef;
int64_t minVer;
int64_t maxVer;
TSKEY minKey;
TSKEY maxKey;
int64_t nRow;
@ -382,6 +394,7 @@ struct SMemTable {
int32_t nBucket;
STbData **aBucket;
};
SQueryNode qList;
};
struct TSDBROW {
@ -607,9 +620,11 @@ struct SDelFWriter {
};
struct STsdbReadSnap {
SMemTable *pMem;
SMemTable *pIMem;
STsdbFS fs;
SMemTable *pMem;
SQueryNode *pNode;
SMemTable *pIMem;
SQueryNode *pINode;
STsdbFS fs;
};
struct SDataFWriter {
@ -729,6 +744,9 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
// tsdbCache ==============================================================================================
typedef struct SCacheRowsReader {
STsdb *pTsdb;
SVersionRange verRange;
TdThreadMutex readerMutex;
SVnode *pVnode;
STSchema *pSchema;
uint64_t uid;

View File

@ -87,7 +87,8 @@ typedef struct SCommitInfo SCommitInfo;
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
#define VND_INFO_FNAME "vnode.json"
#define VNODE_BUF_POOL_SEG 1 // TODO: change parameter here for sync/async commit
#define VND_INFO_FNAME "vnode.json"
// vnd.h

View File

@ -725,6 +725,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->pushLock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
@ -791,6 +793,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req);
// todo lock
tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
@ -881,6 +886,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pHandle->execHandle.task);
}
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
}
// close handle

View File

@ -109,6 +109,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->type = type;
p->pVnode = pVnode;
p->pTsdb = p->pVnode->pTsdb;
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
p->numOfCols = numOfCols;
p->suid = suid;
@ -145,6 +147,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
}
p->idstr = taosMemoryStrDup(idstr);
taosThreadMutexInit(&p->readerMutex, NULL);
*pReader = p;
return TSDB_CODE_SUCCESS;
@ -164,7 +167,9 @@ void* tsdbCacherowsReaderClose(void* pReader) {
destroyLastBlockLoadInfo(p->pLoadInfo);
taosMemoryFree((void*) p->idstr);
taosMemoryFree((void*)p->idstr);
taosThreadMutexDestroy(&p->readerMutex);
taosMemoryFree(pReader);
return NULL;
}
@ -199,6 +204,19 @@ static void freeItem(void* pItem) {
}
}
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
int32_t code = 0;
SCacheRowsReader* pReader = pQHandle;
taosThreadMutexLock(&pReader->readerMutex);
// pause current reader's state if not paused, save ts & version for resuming
// just wait for the big all tables' snapshot untaking for now
taosThreadMutexUnlock(&pReader->readerMutex);
return code;
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
@ -241,7 +259,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayPush(pLastCols, &p);
}
tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l");
taosThreadMutexLock(&pr->readerMutex);
tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL;
@ -355,8 +374,9 @@ _end:
tsdbDataFReaderClose(&pr->pDataFReaderLast);
tsdbDataFReaderClose(&pr->pDataFReader);
tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l");
resetLastBlockLoadInfo(pr->pLoadInfo);
tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap);
taosThreadMutexUnlock(&pr->readerMutex);
for (int32_t j = 0; j < pr->numOfCols; ++j) {
taosMemoryFree(pRes[j]);

View File

@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
pTsdb->imem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable);
tsdbUnrefMemTable(pMemTable, NULL);
goto _exit;
}
@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) {
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
if (pMemTable) {
tsdbUnrefMemTable(pMemTable);
tsdbUnrefMemTable(pMemTable, NULL);
}
_exit:

View File

@ -50,6 +50,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
pMemTable->pTsdb = pTsdb;
pMemTable->pPool = pTsdb->pVnode->inUse;
pMemTable->nRef = 1;
pMemTable->minVer = VERSION_MAX;
pMemTable->maxVer = VERSION_MIN;
pMemTable->minKey = TSKEY_MAX;
pMemTable->maxKey = TSKEY_MIN;
pMemTable->nRow = 0;
@ -62,6 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosMemoryFree(pMemTable);
goto _err;
}
pMemTable->qList.pNext = &pMemTable->qList;
pMemTable->qList.ppNext = &pMemTable->qList.pNext;
vnodeBufPoolRef(pMemTable->pPool);
*ppMemTable = pMemTable;
@ -146,6 +150,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi
}
if (code) goto _err;
// update
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
return code;
_err:
@ -197,6 +205,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
}
pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
@ -237,7 +247,6 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
if (pIter) {
taosMemoryFree(pIter);
}
return NULL;
}
@ -740,16 +749,45 @@ _exit:
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
void tsdbRefMemTable(SMemTable *pMemTable) {
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) {
int32_t code = 0;
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
ASSERT(nRef > 0);
/*
// register handle (todo: take concurrency in consideration)
*ppNode = taosMemoryMalloc(sizeof(SQueryNode));
if (*ppNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(*ppNode)->pQHandle = pQHandle;
(*ppNode)->reseek = reseek;
(*ppNode)->pNext = pMemTable->qList.pNext;
(*ppNode)->ppNext = &pMemTable->qList.pNext;
pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext;
pMemTable->qList.pNext = *ppNode;
*/
_exit:
return code;
}
void tsdbUnrefMemTable(SMemTable *pMemTable) {
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) {
int32_t code = 0;
/*
// unregister handle (todo: take concurrency in consideration)
if (pNode) {
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
taosMemoryFree(pNode);
}
*/
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
if (nRef == 0) {
tsdbMemTableDestroy(pMemTable);
}
return code;
}
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
@ -789,3 +827,29 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
_exit:
return aTbDataP;
}
int32_t tsdbRecycleMemTable(SMemTable *pMemTable) {
int32_t code = 0;
SQueryNode *pNode = pMemTable->qList.pNext;
while (1) {
ASSERT(pNode != &pMemTable->qList);
SQueryNode *pNextNode = pNode->pNext;
if (pNextNode == &pMemTable->qList) {
code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit;
break;
} else {
code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit;
pNode = pMemTable->qList.pNext;
ASSERT(pNode == pNextNode);
}
}
// NOTE: Take care here, pMemTable is destroyed
_exit:
return code;
}

View File

@ -156,6 +156,9 @@ typedef struct SBlockInfoBuf {
struct STsdbReader {
STsdb* pTsdb;
SVersionRange verRange;
TdThreadMutex readerMutex;
bool suspended;
uint64_t suid;
int16_t order;
bool freeBlock;
@ -173,10 +176,10 @@ struct STsdbReader {
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SVersionRange verRange;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
// SVersionRange verRange;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
@ -399,9 +402,7 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
taosHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
return pWindow->skey > pWindow->ekey;
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
@ -635,6 +636,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
taosThreadMutexInit(&pReader->readerMutex, NULL);
*ppReader = pReader;
return code;
@ -729,12 +732,25 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
sizeInDisk += pScanInfo->mapData.nData;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
STimeWindow w = pReader->window;
if (ASCENDING_TRAVERSE(pReader->order)) {
w.skey = pScanInfo->lastKey + step;
} else {
w.ekey = pScanInfo->lastKey + step;
}
if (isEmptyQueryTimeWindow(&w)) {
continue;
}
SDataBlk block = {0};
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
// 1. time range check
if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
// if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
continue;
}
@ -915,7 +931,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
int32_t step = asc ? 1 : -1;
// make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// 1. copy data in a batch model
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
@ -2170,9 +2186,11 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
TSDBKEY startKey = {0};
if (ASCENDING_TRAVERSE(pReader->order)) {
startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
// startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
} else {
startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
// startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
}
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
@ -2295,7 +2313,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLas
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
return false; // this is an invalid result.
return false; // this is an invalid result.
}
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
}
@ -2451,7 +2469,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
while (1) {
bool hasBlockData = false;
{
while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) { // find the first qualified row in data block
while (pBlockData->nRow > 0 &&
pBlockData->uid == pBlockScanInfo->uid) { // find the first qualified row in data block
if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
hasBlockData = true;
break;
@ -2882,9 +2901,16 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
}
// set the correct start position in case of the first/last file block, according to the query time window
void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
int64_t lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) {
lastKey = pScanInfo->lastKey;
}
}
SReaderStatus* pStatus = &pReader->status;
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
@ -2892,6 +2918,7 @@ void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
pDumpInfo->totalRows = pBlock->nRow;
pDumpInfo->allDumped = false;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
pDumpInfo->lastKey = lastKey;
}
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
@ -3096,7 +3123,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
return false;
} else if (pKey->ts == last->ts) {
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
prev->version >= pVerRange->minVer);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
@ -3285,7 +3313,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
bool loadNeighbor = true;
bool loadNeighbor = true;
int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
@ -3826,40 +3854,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
goto _err;
}
if (numOfTables > 0) {
code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
STsdbReader* pPrevReader = pReader->innerReader[0];
STsdbReader* pNextReader = pReader->innerReader[1];
// we need only one row
pPrevReader->capacity = 1;
pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->pSchema = pReader->pSchema;
pPrevReader->pMemSchema = pReader->pMemSchema;
pPrevReader->pReadSnap = pReader->pReadSnap;
pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->pSchema = pReader->pSchema;
pNextReader->pMemSchema = pReader->pMemSchema;
pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
pReader->suspended = true;
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
@ -3932,7 +3927,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
pReader->pDelIdx = NULL;
}
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
qTrace("tsdb/reader: %p, untake snapshot", pReader);
tsdbUntakeReadSnap(pReader, pReader->pReadSnap);
taosThreadMutexDestroy(&pReader->readerMutex);
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
SIOCostSummary* pCost = &pReader->cost;
@ -3964,9 +3962,156 @@ void tsdbReaderClose(STsdbReader* pReader) {
if (pReader->pMemSchema != pReader->pSchema) {
taosMemoryFree(pReader->pMemSchema);
}
taosMemoryFreeClear(pReader);
}
int32_t tsdbReaderSuspend(STsdbReader* pReader) {
int32_t code = 0;
// save reader's base state & reset top state to be reconstructed from base state
SReaderStatus* pStatus = &pReader->status;
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo != NULL) {
pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _err;
}
} else {
pBlockScanInfo = *pStatus->pTableIter;
}
tsdbDataFReaderClose(&pReader->pFileReader);
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
p->iterInit = false;
p->iiter.hasVal = false;
if (p->iter.iter != NULL) {
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
}
p->delSkyline = taosArrayDestroy(p->delSkyline);
// p->lastKey = ts;
}
} else {
pBlockScanInfo = *pStatus->pTableIter;
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
STimeWindow w = pReader->pResBlock->info.window;
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;
// reset current current table's data block scan info,
pBlockScanInfo->iterInit = false;
// pBlockScanInfo->iiter.hasVal = false;
if (pBlockScanInfo->iter.iter != NULL) {
pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
}
if (pBlockScanInfo->iiter.iter != NULL) {
pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
}
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
tMapDataClear(&pBlockScanInfo->mapData);
// TODO: keep skyline for reuse
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
}
}
tsdbUntakeReadSnap(pReader, pReader->pReadSnap);
pReader->suspended = true;
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr);
return code;
_err:
tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
return code;
}
static int32_t tsdbSetQueryReseek(void* pQHandle) {
int32_t code = 0;
STsdbReader* pReader = pQHandle;
taosThreadMutexLock(&pReader->readerMutex);
if (pReader->suspended) {
taosThreadMutexUnlock(&pReader->readerMutex);
return code;
}
tsdbReaderSuspend(pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
return code;
}
int32_t tsdbReaderResume(STsdbReader* pReader) {
int32_t code = 0;
STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
// restore reader's state
// task snapshot
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
if (numOfTables > 0) {
qTrace("tsdb/reader: %p, take snapshot", pReader);
code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
STsdbReader* pPrevReader = pReader->innerReader[0];
STsdbReader* pNextReader = pReader->innerReader[1];
// we need only one row
pPrevReader->capacity = 1;
pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->pSchema = pReader->pSchema;
pPrevReader->pMemSchema = pReader->pMemSchema;
pPrevReader->pReadSnap = pReader->pReadSnap;
pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->pSchema = pReader->pSchema;
pNextReader->pMemSchema = pReader->pMemSchema;
pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
pReader->suspended = false;
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
return code;
_err:
tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
return code;
}
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
// cleanup the data that belongs to the previous data block
SSDataBlock* pBlock = pReader->pResBlock;
@ -4000,10 +4145,24 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false;
}
SReaderStatus* pStatus = &pReader->status;
qTrace("tsdb/read: %p, take read mutex", pReader);
taosThreadMutexLock(&pReader->readerMutex);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
pReader->step = EXTERNAL_ROWS_PREV;
if (ret) {
pStatus = &pReader->innerReader[0]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
}
return ret;
}
}
@ -4022,6 +4181,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
bool ret = doTsdbNextDataBlock(pReader);
if (ret) {
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
}
return ret;
}
@ -4036,10 +4200,19 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
pReader->step = EXTERNAL_ROWS_NEXT;
if (ret1) {
pStatus = &pReader->innerReader[1]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
}
return ret1;
}
}
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
return false;
}
@ -4175,12 +4348,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
}
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
if (pStatus->composedDataBlock) {
return pReader->pResBlock;
}
SReaderStatus* pStatus = &pReader->status;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo =
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
@ -4202,20 +4370,50 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
return pReader->pResBlock;
}
void tsdbReleaseDataBlock(STsdbReader* pReader) {
// SReaderStatus* pStatus = &pReader->status;
// if (!pStatus->composedDataBlock) {
taosThreadMutexUnlock(&pReader->readerMutex);
//}
}
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
STsdbReader* pTReader = pReader;
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->step == EXTERNAL_ROWS_PREV) {
return doRetrieveDataBlock(pReader->innerReader[0]);
pTReader = pReader->innerReader[0];
} else if (pReader->step == EXTERNAL_ROWS_NEXT) {
return doRetrieveDataBlock(pReader->innerReader[1]);
pTReader = pReader->innerReader[1];
}
}
return doRetrieveDataBlock(pReader);
SReaderStatus* pStatus = &pTReader->status;
if (pStatus->composedDataBlock) {
return pTReader->pResBlock;
}
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
return ret;
}
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
SReaderStatus* pStatus = &pReader->status;
qTrace("tsdb/read: %p, take read mutex", pReader);
taosThreadMutexLock(&pReader->readerMutex);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
taosThreadMutexUnlock(&pReader->readerMutex);
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
return TSDB_CODE_SUCCESS;
}
@ -4343,13 +4541,18 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int64_t rows = 0;
SReaderStatus* pStatus = &pReader->status;
taosThreadMutexLock(&pReader->readerMutex);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
while (pStatus->pTableIter != NULL) {
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) {
if (pReader->pReadSnap->pMem != NULL) {
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
if (d != NULL) {
rows += tsdbGetNRowsInTbData(d);
@ -4357,7 +4560,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
}
STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) {
if (pReader->pReadSnap->pIMem != NULL) {
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
if (di != NULL) {
rows += tsdbGetNRowsInTbData(di);
@ -4368,6 +4571,8 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
}
taosThreadMutexUnlock(&pReader->readerMutex);
return rows;
}
@ -4409,8 +4614,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS;
}
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
int32_t code = 0;
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
SVersionRange* pRange = &pReader->verRange;
// alloc
*ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
@ -4427,15 +4634,14 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr
}
// take snapshot
(*ppSnap)->pMem = pTsdb->mem;
(*ppSnap)->pIMem = pTsdb->imem;
if ((*ppSnap)->pMem) {
tsdbRefMemTable((*ppSnap)->pMem);
if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode);
(*ppSnap)->pMem = pTsdb->mem;
}
if ((*ppSnap)->pIMem) {
tsdbRefMemTable((*ppSnap)->pIMem);
if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode);
(*ppSnap)->pIMem = pTsdb->imem;
}
// fs
@ -4452,23 +4658,25 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr
goto _exit;
}
tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
_exit:
return code;
}
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) {
STsdb* pTsdb = pReader->pTsdb;
if (pSnap) {
if (pSnap->pMem) {
tsdbUnrefMemTable(pSnap->pMem);
tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode);
}
if (pSnap->pIMem) {
tsdbUnrefMemTable(pSnap->pIMem);
tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode);
}
tsdbFSUnref(pTsdb, &pSnap->fs);
taosMemoryFree(pSnap);
}
tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
}

View File

@ -83,8 +83,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowK
tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
TD_VID(pTsdb->pVnode), uid, now, minKey, maxKey, rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
return TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
}
return 0;
@ -164,15 +163,13 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
#endif
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
ASSERT(pMsg != NULL);
int32_t code = 0;
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
terrno = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < size; ++i) {
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
@ -182,8 +179,8 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
int32_t nRows = aColData[0].nVal;
TSKEY *aKey = (TSKEY *)aColData[0].pData;
for (int32_t r = 0; r < nRows; ++r) {
if (tsdbCheckRowRange(pTsdb, pData->uid, aKey[r], minKey, maxKey, now) < 0) {
return -1;
if ((code = tsdbCheckRowRange(pTsdb, pData->uid, aKey[r], minKey, maxKey, now)) < 0) {
goto _exit;
}
}
}
@ -191,13 +188,13 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
int32_t nRows = taosArrayGetSize(pData->aRowP);
for (int32_t r = 0; r < nRows; ++r) {
SRow *pRow = (SRow *)taosArrayGetP(pData->aRowP, r);
if (tsdbCheckRowRange(pTsdb, pData->uid, pRow->ts, minKey, maxKey, now) < 0) {
return -1;
if ((code = tsdbCheckRowRange(pTsdb, pData->uid, pRow->ts, minKey, maxKey, now)) < 0) {
goto _exit;
}
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
_exit:
return code;
}

View File

@ -74,7 +74,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
ASSERT(pVnode->pPool == NULL);
for (int i = 0; i < 3; i++) {
for (int i = 0; i < VNODE_BUF_POOL_SEG; i++) {
// create pool
if (vnodeBufPoolCreate(pVnode, size, &pPool)) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));

View File

@ -776,6 +776,7 @@ void ctgFreeHandleImpl(SCatalog* pCtg);
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
char* dbFName, SArray* pNames, bool update);
int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFName, const char* pTbs[], int32_t tbNum, int32_t* vgId);
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache* dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);

View File

@ -551,6 +551,37 @@ _return:
CTG_RET(code);
}
int32_t ctgGetTbsHashVgId(SCatalog* pCtg, SRequestConnInfo* pConn, int32_t acctId, const char* pDb, const char* pTbs[], int32_t tbNum, int32_t* vgId) {
if (IS_SYS_DBNAME(pDb)) {
ctgError("no valid vgInfo for db, dbname:%s", pDb);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%s", acctId, pDb);
SDBVgInfo* vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo, NULL));
CTG_ERR_JRET(ctgGetVgIdsFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, dbFName, pTbs, tbNum, vgId));
_return:
if (dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
if (vgInfo) {
freeVgInfo(vgInfo);
}
CTG_RET(code);
}
int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
@ -1141,6 +1172,13 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, NULL));
}
int32_t catalogGetTablesHashVgId(SCatalog* pCtg, SRequestConnInfo* pConn, int32_t acctId, const char* pDb, const char* pTableName[],
int32_t tableNum, int32_t *vgId) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTbsHashVgId(pCtg, pConn, acctId, pDb, pTableName, tableNum, vgId));
}
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) {
CTG_API_ENTER();

View File

@ -986,6 +986,43 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
CTG_RET(code);
}
int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFName, const char* pTbs[], int32_t tbNum, int32_t* vgId) {
int32_t code = 0;
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
SVgroupInfo* vgInfo = NULL;
char tbFullName[TSDB_TABLE_FNAME_LEN];
snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
int32_t offset = strlen(tbFullName);
for (int32_t i = 0; i < tbNum; ++i) {
snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", pTbs[i]);
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
dbInfo->hashPrefix, dbInfo->hashSuffix);
vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
(int32_t)taosArrayGetSize(dbInfo->vgArray));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
vgId[i] = vgInfo->vgId;
ctgDebug("Got tb %s vgId:%d", tbFullName, vgInfo->vgId);
}
CTG_RET(code);
}
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t*)key1 < ((SSTableVersion*)key2)->suid) {
return -1;

View File

@ -1207,3 +1207,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
}

View File

@ -2654,3 +2654,24 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}
void qStreamCloseTsdbReader(void* task) {
if (task == NULL) return;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
SOperatorInfo* pOp = pTaskInfo->pRoot;
qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
pTaskInfo->streamInfo.lastStatus.ts);
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pDownstreamOp->info;
if (pInfo->pTableScanOp) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
return;
}
}
}
}

View File

@ -306,12 +306,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
pCost->loadBlockStatis += 1;
@ -321,6 +323,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else {
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
@ -342,6 +345,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->filterOutBlocks += 1;
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
}
}
@ -356,7 +360,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
*status = FUNC_DATA_REQUIRED_FILTEROUT;
return TSDB_CODE_SUCCESS;
}
@ -1584,7 +1588,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows", pResult->info.rows);
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey);
pTaskInfo->streamInfo.returned = 1;
return pResult;
} else {
@ -2554,6 +2559,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
STsdbReader* reader = pInfo->base.dataReader;
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
@ -2588,6 +2594,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader);
tsdbReaderClose(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
return pBlock;

View File

@ -175,8 +175,6 @@ _end:
void taosCleanupKeywordsTable();
SToken taosTokenDup(SToken *pToken, char *buf, int32_t len);
#ifdef __cplusplus
}
#endif

View File

@ -18,6 +18,16 @@
#include "parToken.h"
#include "ttime.h"
static void clearColValArray(SArray* pCols) {
int32_t num = taosArrayGetSize(pCols);
for (int32_t i = 0; i < num; ++i) {
SColVal* pCol = taosArrayGet(pCols, i);
if (TSDB_DATA_TYPE_NCHAR == pCol->type) {
taosMemoryFreeClear(pCol->value.pData);
}
}
}
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
@ -189,7 +199,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
SSchema* pColSchema = schema + index;
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
SSmlKv* kv = (SSmlKv*)data;
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0){
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
ret = TSDB_CODE_SML_INVALID_DATA;
goto end;
}
@ -207,9 +217,11 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, size, &len)) {
if (errno == E2BIG) {
taosMemoryFree(pUcs4);
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end;
}
taosMemoryFree(pUcs4);
ret = TSDB_CODE_TSC_INVALID_VALUE;
goto end;
}
@ -316,7 +328,10 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
continue;
}
SSmlKv* kv = *(SSmlKv**)p;
if(kv->type != pColSchema->type){
ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
goto end;
}
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
}
@ -354,10 +369,12 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
goto end;
}
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
clearColValArray(pTableCxt->pValues);
}
end:
insDestroyBoundColInfo(&bindTags);
tdDestroySVCreateTbReq(pCreateTblReq);
taosMemoryFree(pCreateTblReq);
taosArrayDestroy(tagName);
return ret;

View File

@ -1161,6 +1161,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, pSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
taosMemoryFree(pUcs4);
if (errno == E2BIG) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
}

View File

@ -73,8 +73,6 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
char* p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
if (p != NULL) { // db has been specified in sql string so we ignore current db path
assert(*p == TS_PATH_DELIMITER[0]);
int32_t dbLen = p - pTableName->z;
if (dbLen <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
@ -106,8 +104,6 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
assert(pTableName->n < TSDB_TABLE_FNAME_LEN);
char name[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(name, pTableName->z, pTableName->n);
strdequote(name);

View File

@ -716,14 +716,3 @@ void taosCleanupKeywordsTable() {
taosHashCleanup(m);
}
}
SToken taosTokenDup(SToken* pToken, char* buf, int32_t len) {
assert(pToken != NULL && buf != NULL && len > pToken->n);
strncpy(buf, pToken->z, pToken->n);
buf[pToken->n] = 0;
SToken token = *pToken;
token.z = buf;
return token;
}

View File

@ -3902,7 +3902,8 @@ static int32_t checkDbKeepOption(STranslateContext* pCxt, SDatabaseOptions* pOpt
if (pOptions->keep[0] < TSDB_MIN_KEEP || pOptions->keep[1] < TSDB_MIN_KEEP || pOptions->keep[2] < TSDB_MIN_KEEP ||
pOptions->keep[0] > tsdbMaxKeep || pOptions->keep[1] > tsdbMaxKeep || pOptions->keep[2] > tsdbMaxKeep) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64
" valid range: [%dm, %" PRId64 "m]",
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP, tsdbMaxKeep);
}
@ -5864,6 +5865,7 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
}
taosMemoryFree(pMeta);
return code;
}
@ -6551,7 +6553,7 @@ static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SN
nodesDestroyNode((SNode*)pOper);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(((SColumnNode*)pOper->pLeft)->colName, pColName);
snprintf(((SColumnNode*)pOper->pLeft)->colName, sizeof(((SColumnNode*)pOper->pLeft)->colName), "%s", pColName);
*pOp = (SNode*)pOper;
return TSDB_CODE_SUCCESS;

View File

@ -33,7 +33,6 @@ std::unique_ptr<MockCatalogService> g_mockCatalogService;
class TableBuilder : public ITableBuilder {
public:
virtual TableBuilder& addColumn(const string& name, int8_t type, int32_t bytes) {
assert(colId_ <= schema()->tableInfo.numOfTags + schema()->tableInfo.numOfColumns);
SSchema* col = schema()->schema + (colId_ - 1);
col->type = type;
col->colId = colId_++;

View File

@ -208,10 +208,9 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
return 0;
}
int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
if (pFile == NULL) {
return 0;
if (pFile == NULL || pFile->fd < 0) {
return -1;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
#ifdef WINDOWS
@ -265,7 +264,10 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
} else {
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+";
}
assert(!(tdFileOptions & TD_FILE_EXCL));
ASSERT(!(tdFileOptions & TD_FILE_EXCL));
if (tdFileOptions & TD_FILE_EXCL) {
return NULL;
}
fp = fopen(path, mode);
if (fp == NULL) {
return NULL;
@ -364,7 +366,10 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
if (pFile->fd < 0) {
return -1;
}
int64_t leftbytes = count;
int64_t readbytes;
char *tbuf = (char *)buf;
@ -408,7 +413,10 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
if (pFile->fd < 0) {
return -1;
}
#ifdef WINDOWS
size_t pos = _lseeki64(pFile->fd, 0, SEEK_CUR);
_lseeki64(pFile->fd, offset, SEEK_SET);
@ -466,7 +474,10 @@ int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t
#if FILE_WITH_LOCK
taosThreadRwlockWrlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
if (pFile->fd < 0) {
return 0;
}
#ifdef WINDOWS
size_t pos = _lseeki64(pFile->fd, 0, SEEK_CUR);
_lseeki64(pFile->fd, offset, SEEK_SET);
@ -485,7 +496,10 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
if (pFile->fd < 0) {
return -1;
}
#ifdef WINDOWS
int64_t ret = _lseeki64(pFile->fd, offset, whence);
#else
@ -501,7 +515,10 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
if (pFile->fd < 0) {
return -1;
}
struct stat fileStat;
#ifdef WINDOWS
@ -525,6 +542,10 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
}
int32_t taosLockFile(TdFilePtr pFile) {
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
if(pFile->fd < 0) {
return -1;
}
#ifdef WINDOWS
BOOL fSuccess = FALSE;
LARGE_INTEGER fileSize;
@ -543,13 +564,15 @@ int32_t taosLockFile(TdFilePtr pFile) {
}
return 0;
#else
assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_EX | LOCK_NB);
#endif
}
int32_t taosUnLockFile(TdFilePtr pFile) {
ASSERT(pFile->fd >= 0);
if(pFile->fd < 0) {
return 0;
}
#ifdef WINDOWS
BOOL fSuccess = FALSE;
OVERLAPPED overlapped = {0};
@ -561,19 +584,19 @@ int32_t taosUnLockFile(TdFilePtr pFile) {
}
return 0;
#else
assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_UN | LOCK_NB);
#endif
}
int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) {
#ifdef WINDOWS
if (pFile->fd < 0) {
errno = EBADF;
if (pFile == NULL) {
return 0;
}
if(pFile->fd < 0) {
printf("Ftruncate file error, fd arg was negative\n");
return -1;
}
#ifdef WINDOWS
HANDLE h = (HANDLE)_get_osfhandle(pFile->fd);
@ -618,11 +641,6 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) {
return 0;
#else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
return ftruncate(pFile->fd, l_size);
#endif
}
@ -650,7 +668,10 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
if (pFileOut == NULL || pFileIn == NULL) {
return 0;
}
assert(pFileIn->fd >= 0 && pFileOut->fd >= 0);
ASSERT(pFileIn->fd >= 0 && pFileOut->fd >= 0);
if(pFileIn->fd < 0 || pFileOut->fd < 0) {
return 0;
}
#ifdef WINDOWS
@ -743,11 +764,9 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
}
void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
if (pFile == NULL) {
if (pFile == NULL || pFile->fp == NULL) {
return;
}
assert(pFile->fp != NULL);
va_list ap;
va_start(ap, format);
vfprintf(pFile->fp, format, ap);
@ -772,7 +791,10 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
if (*ptrBuf != NULL) {
taosMemoryFreeClear(*ptrBuf);
}
assert(pFile->fp != NULL);
ASSERT(pFile->fp != NULL);
if (pFile->fp == NULL) {
return -1;
}
#ifdef WINDOWS
*ptrBuf = taosMemoryMalloc(1024);
if (*ptrBuf == NULL) return -1;
@ -792,7 +814,10 @@ int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) {
if (pFile == NULL || buf == NULL) {
return -1;
}
assert(pFile->fp != NULL);
ASSERT(pFile->fp != NULL);
if (pFile->fp == NULL) {
return -1;
}
if (fgets(buf, maxSize, pFile->fp) == NULL) {
return -1;
}
@ -801,9 +826,12 @@ int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) {
int32_t taosEOFFile(TdFilePtr pFile) {
if (pFile == NULL) {
return 0;
return -1;
}
ASSERT(pFile->fp != NULL);
if(pFile->fp == NULL) {
return -1;
}
assert(pFile->fp != NULL);
return feof(pFile->fp);
}

View File

@ -266,7 +266,10 @@ void *taosMemoryRealloc(void *ptr, int64_t size) {
if (ptr == NULL) return taosMemoryMalloc(size);
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
+ return NULL;
+ }
TdMemoryInfo tdMemoryInfo;
memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo));
@ -288,8 +291,10 @@ void *taosMemoryStrDup(const char *ptr) {
if (ptr == NULL) return NULL;
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
+ return NULL;
+ }
void *tmp = tstrdup(pTdMemoryInfo);
if (tmp == NULL) return NULL;
@ -323,7 +328,10 @@ int64_t taosMemorySize(void *ptr) {
#ifdef USE_TD_MEMORY
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
+ return NULL;
+ }
return pTdMemoryInfo->memorySize;
#else
@ -348,7 +356,7 @@ void taosMemoryTrim(int32_t size) {
void* taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
#ifdef USE_TD_MEMORY
assert(0);
ASSERT(0);
#else
#if defined(LINUX)
void* p = memalign(alignment, size);

View File

@ -86,8 +86,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
while ((rc = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue;
return rc;
/* This should have timed out */
// assert(errno == ETIMEDOUT);
// assert(rc != 0);
// ASSERT(errno == ETIMEDOUT);
// ASSERT(rc != 0);
// GetSystemTimeAsFileTime(&ft_after);
// // We specified a non-zero wait. Time must advance.
// if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime)

View File

@ -298,7 +298,7 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
return -1;
}
#ifdef WINDOWS
assert(0);
ASSERT(0);
return 0;
#else
return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen);
@ -662,7 +662,7 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
int taosGetLocalIp(const char *eth, char *ip) {
#if defined(WINDOWS)
// DO NOTHAING
assert(0);
ASSERT(0);
return 0;
#else
int fd;
@ -689,7 +689,7 @@ int taosGetLocalIp(const char *eth, char *ip) {
int taosValidIp(uint32_t ip) {
#if defined(WINDOWS)
// DO NOTHAING
assert(0);
ASSERT(0);
return 0;
#else
int ret = -1;
@ -924,7 +924,7 @@ uint32_t ip2uint(const char *const ip_addr) {
void taosBlockSIGPIPE() {
#ifdef WINDOWS
// assert(0);
// ASSERT(0);
#else
sigset_t signal_mask;
sigemptyset(&signal_mask);
@ -994,7 +994,7 @@ int32_t taosGetFqdn(char *fqdn) {
#else
printf("failed to get hostname, reason:%s\n", strerror(errno));
#endif
assert(0);
ASSERT(0);
return -1;
}
@ -1031,7 +1031,7 @@ void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); }
void taosSetMaskSIGPIPE() {
#ifdef WINDOWS
// assert(0);
// ASSERT(0);
#else
sigset_t signal_mask;
sigemptyset(&signal_mask);

View File

@ -112,7 +112,7 @@ int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes) {
}
TdUcs4 *tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4) {
assert(taosMemorySize(target_ucs4) >= len_ucs4 * sizeof(TdUcs4));
ASSERT(taosMemorySize(target_ucs4) >= len_ucs4 * sizeof(TdUcs4));
return memcpy(target_ucs4, source_ucs4, len_ucs4 * sizeof(TdUcs4));
}
@ -355,8 +355,8 @@ int64_t taosStr2Int64(const char *str, char **pEnd, int32_t radix) {
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -367,8 +367,8 @@ uint64_t taosStr2UInt64(const char *str, char **pEnd, int32_t radix) {
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -379,8 +379,8 @@ int32_t taosStr2Int32(const char *str, char **pEnd, int32_t radix) {
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -391,8 +391,8 @@ uint32_t taosStr2UInt32(const char *str, char **pEnd, int32_t radix) {
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -403,10 +403,10 @@ int16_t taosStr2Int16(const char *str, char **pEnd, int32_t radix) {
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp >= SHRT_MIN);
assert(tmp <= SHRT_MAX);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp >= SHRT_MIN);
ASSERT(tmp <= SHRT_MAX);
#endif
return (int16_t)tmp;
}
@ -417,9 +417,9 @@ uint16_t taosStr2UInt16(const char *str, char **pEnd, int32_t radix) {
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp <= USHRT_MAX);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp <= USHRT_MAX);
#endif
return (uint16_t)tmp;
}
@ -427,10 +427,10 @@ uint16_t taosStr2UInt16(const char *str, char **pEnd, int32_t radix) {
int8_t taosStr2Int8(const char *str, char **pEnd, int32_t radix) {
int32_t tmp = strtol(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp >= SCHAR_MIN);
assert(tmp <= SCHAR_MAX);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp >= SCHAR_MIN);
ASSERT(tmp <= SCHAR_MAX);
#endif
return tmp;
}
@ -441,9 +441,9 @@ uint8_t taosStr2UInt8(const char *str, char **pEnd, int32_t radix) {
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp <= UCHAR_MAX);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp <= UCHAR_MAX);
#endif
return tmp;
}
@ -451,9 +451,9 @@ uint8_t taosStr2UInt8(const char *str, char **pEnd, int32_t radix) {
double taosStr2Double(const char *str, char **pEnd) {
double tmp = strtod(str, pEnd);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp != HUGE_VAL);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp != HUGE_VAL);
#endif
return tmp;
}
@ -461,10 +461,10 @@ double taosStr2Double(const char *str, char **pEnd) {
float taosStr2Float(const char *str, char **pEnd) {
float tmp = strtof(str, pEnd);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp != HUGE_VALF);
assert(tmp != NAN);
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp != HUGE_VALF);
ASSERT(tmp != NAN);
#endif
return tmp;
}

View File

@ -249,7 +249,7 @@ void taosGetSystemInfo() {
int32_t taosGetEmail(char *email, int32_t maxLen) {
#ifdef WINDOWS
// assert(0);
// ASSERT(0);
#elif defined(_TD_DARWIN_64)
const char *filepath = "/usr/local/taos/email";
@ -863,7 +863,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
char *taosGetCmdlineByPID(int pid) {
#ifdef WINDOWS
assert(0);
ASSERT(0);
return "";
#elif defined(_TD_DARWIN_64)
static char cmdline[1024];

View File

@ -90,7 +90,7 @@ typedef struct FILE TdCmd;
void* taosLoadDll(const char* filename) {
#if defined(WINDOWS)
assert(0);
ASSERT(0);
return NULL;
#elif defined(_TD_DARWIN_64)
return NULL;
@ -109,7 +109,7 @@ void* taosLoadDll(const char* filename) {
void* taosLoadSym(void* handle, char* name) {
#if defined(WINDOWS)
assert(0);
ASSERT(0);
return NULL;
#elif defined(_TD_DARWIN_64)
return NULL;
@ -130,7 +130,7 @@ void* taosLoadSym(void* handle, char* name) {
void taosCloseDll(void* handle) {
#if defined(WINDOWS)
assert(0);
ASSERT(0);
return;
#elif defined(_TD_DARWIN_64)
return;

View File

@ -233,7 +233,8 @@ int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) {
int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) {
#ifdef TD_USE_SPINLOCK_AS_MUTEX
assert(pshared == 0);
ASSERT(pshared == 0);
if (pshared != 0) return -1;
return pthread_mutex_init((pthread_mutex_t *)lock, NULL);
#else
return pthread_spin_init((pthread_spinlock_t *)lock, pshared);

View File

@ -82,6 +82,7 @@
,,y,script,./test.sh -f tsim/insert/tcp.sim
,,y,script,./test.sh -f tsim/insert/update0.sim
,,y,script,./test.sh -f tsim/insert/update1_sort_merge.sim
,,y,script,./test.sh -f tsim/insert/update2.sim
,,y,script,./test.sh -f tsim/parser/alter__for_community_version.sim
,,y,script,./test.sh -f tsim/parser/alter_column.sim
,,y,script,./test.sh -f tsim/parser/alter_stable.sim
@ -425,8 +426,8 @@
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,n,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
,,n,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py

View File

@ -26,7 +26,10 @@
#include <pthread.h>
#include "taos.h"
int rtTables = 20;
#define RT_TABLE_NUM 100
int rtTables = RT_TABLE_NUM;
int rtTableUs[RT_TABLE_NUM] = {0};
char hostName[128];
static void rtExecSQL(TAOS *taos, char *command) {
@ -101,6 +104,22 @@ int rtPrepare(TAOS ** p, int prefix, int suffix) {
return 0;
}
int32_t rtGetTimeOfDay(struct timeval *tv) {
return gettimeofday(tv, NULL);
}
static int64_t rtGetTimestampMs() {
struct timeval systemTime;
rtGetTimeOfDay(&systemTime);
return (int64_t)systemTime.tv_sec * 1000LL + (int64_t)systemTime.tv_usec/1000;
}
static int64_t rtGetTimestampUs() {
struct timeval systemTime;
rtGetTimeOfDay(&systemTime);
return (int64_t)systemTime.tv_sec * 1000000LL + (int64_t)systemTime.tv_usec;
}
int rtGetDbRouteInfo(TAOS * taos) {
TAOS_DB_ROUTE_INFO dbInfo;
int code = taos_get_db_route_info(taos, "db1", &dbInfo);
@ -126,7 +145,10 @@ int rtGetTableRouteInfo(TAOS * taos) {
char sql[1024] = {0};
for (int32_t i = 0; i < rtTables; ++i) {
sprintf(table, "tb%d", i);
int64_t startTs = rtGetTimestampUs();
int code = taos_get_table_vgId(taos, "db1", table, &vgId1);
int64_t endTs = rtGetTimestampUs();
rtTableUs[i] = (int)(endTs - startTs);
if (code) {
rtExit("taos_get_table_vgId", taos_errstr(NULL));
}
@ -142,9 +164,61 @@ int rtGetTableRouteInfo(TAOS * taos) {
}
}
printf("table vgId use us:");
for (int32_t i = 0; i < rtTables; ++i) {
printf("%d ", rtTableUs[i]);
}
printf("\n");
return 0;
}
int rtGetTablesRouteInfo(TAOS * taos) {
char *table = {0};
int *vgId1 = malloc(rtTables * sizeof(int));
int vgId2 = 0;
char sql[1024] = {0};
const char *tbs[RT_TABLE_NUM] = {0};
for (int32_t i = 0; i < rtTables; ++i) {
table = malloc(10);
sprintf(table, "tb%d", i);
tbs[i] = table;
}
int64_t startTs = rtGetTimestampUs();
int code = taos_get_tables_vgId(taos, "db1", tbs, rtTables, vgId1);
int64_t endTs = rtGetTimestampUs();
rtTableUs[0] = (int)(endTs - startTs);
if (code) {
rtExit("taos_get_tables_vgId", taos_errstr(NULL));
}
for (int32_t i = 0; i < rtTables; ++i) {
sprintf(sql, "select vgroup_id from information_schema.ins_tables where table_name=\"tb%d\"", i);
rtFetchVgId(taos, sql, &vgId2);
if (vgId1[i] != vgId2) {
fprintf(stderr, "!!!! table tb%d vgId mis-match, vgId(api):%d, vgId(sys):%d\n", i, vgId1[i], vgId2);
exit(1);
} else {
printf("table tb%d vgId %d\n", i, vgId1[i]);
}
}
printf("tables vgId use us:%d\n", rtTableUs[0]);
for (int32_t i = 0; i < rtTables; ++i) {
free((void*)tbs[i]);
}
free(vgId1);
return 0;
}
void rtClose(TAOS * taos) {
taos_close(taos);
}
@ -170,6 +244,16 @@ int rtRunCase2(void) {
return 0;
}
int rtRunCase3(void) {
TAOS *taos = NULL;
rtPrepare(&taos, 0, 0);
rtGetTablesRouteInfo(taos);
rtClose(taos);
return 0;
}
int main(int argc, char *argv[]) {
if (argc != 2) {
printf("usage: %s server-ip\n", argv[0]);
@ -182,6 +266,7 @@ int main(int argc, char *argv[]) {
rtRunCase1();
rtRunCase2();
rtRunCase3();
int32_t l = 5;
while (l) {

View File

@ -0,0 +1,221 @@
################################################################################################
# migrate from 2.0 insert_update2.sim
################################################################################################
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
### 4096*0.8 - 1 = 3275
$rowSuperBlk = 3275
### 4096 - 3275 -1 - 1 = 819
$rowSubBlk = 819
$ts0 = 1672372800000
$ts1 = 1672372900000
$ts2 = 1672686800000
$i = 0
$db = db0
$stb1 = stb1
$tb1 = tb1
$stb2 = stb2
$tb2 = tb2
print ====== create database
sql drop database if exists $db
sql create database $db keep 1000 duration 10
print ====== create tables
sql use $db
sql create table $stb1 (ts timestamp, c1 bigint, c2 bigint, c3 bigint) tags(t1 int)
sql create table $stb2 (ts timestamp, c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint, c6 bigint, c7 bigint, c8 bigint, c9 bigint, c10 bigint, c11 bigint, c12 bigint, c13 bigint, c14 bigint, c15 bigint, c16 bigint, c17 bigint, c18 bigint, c19 bigint, c20 bigint, c21 bigint, c22 bigint, c23 bigint, c24 bigint, c25 bigint, c26 bigint, c27 bigint, c28 bigint, c29 bigint, c30 bigint) tags(t1 int)
sql create table $tb1 using $stb1 tags(1)
sql create table $tb2 using $stb2 tags(2)
print ====== tables created
print ========== step 1: merge dataRow in mem
$i = 0
while $i < $rowSuperBlk
$xs = $i * 10
$ts = $ts2 + $xs
sql insert into $tb1 (ts,c1) values ( $ts , $i )
$i = $i + 1
endw
sql insert into $tb1 values ( $ts0 , 1,NULL,0)
sql insert into $tb1 (ts,c2,c3) values ( $ts0 , 1,1)
sql select * from $tb1 where ts = $ts0
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data02 != 1 then
return -1
endi
if $data03 != 1 then
return -1
endi
print ========== step 2: merge kvRow in mem
$i = 0
while $i < $rowSuperBlk
$xs = $i * 10
$ts = $ts2 + $xs
sql insert into $tb2 (ts,c1) values ( $ts , $i )
$i = $i + 1
endw
sql insert into $tb2 (ts,c3,c8,c10) values ( $ts0 , 1,NULL,0)
sql insert into $tb2 (ts,c8,c10) values ( $ts0 , 1,1)
sql select ts,c1,c3,c8,c10 from $tb2 where ts = $ts0
if $rows != 1 then
return -1
endi
if $data01 != NULL then
return -1
endi
if $data02 != 1 then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data04 != 1 then
return -1
endi
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 2000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sleep 2000
print ========== step 3: merge dataRow in file
sql insert into $tb1 (ts,c1) values ( $ts0 , 2)
print ========== step 4: merge kvRow in file
sql insert into $tb2 (ts,c3) values ( $ts0 , 2)
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 2000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sleep 2000
sql select * from $tb1 where ts = $ts0
if $rows != 1 then
return -1
endi
if $data01 != 2 then
return -1
endi
if $data02 != 1 then
return -1
endi
if $data03 != 1 then
return -1
endi
sql select ts,c1,c3,c8,c10 from $tb2 where ts = $ts0
if $rows != 1 then
return -1
endi
if $data01 != NULL then
return -1
endi
if $data02 != 2 then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data04 != 1 then
return -1
endi
print ========== step 5: merge dataRow in file/mem
$i = 0
while $i < $rowSubBlk
$xs = $i * 1
$ts = $ts1 + $xs
sql insert into $tb1 (ts,c1) values ( $ts , $i )
$i = $i + 1
endw
print ========== step 6: merge kvRow in file/mem
$i = 0
while $i < $rowSubBlk
$xs = $i * 1
$ts = $ts1 + $xs
sql insert into $tb2 (ts,c1) values ( $ts , $i )
$i = $i + 1
endw
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 2000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sleep 2000
sql insert into $tb1 (ts,c3) values ( $ts0 , 3)
sql insert into $tb2 (ts,c3) values ( $ts0 , 3)
$tsN = $ts0 + 1
sql insert into $tb1 (ts,c1,c3) values ( $tsN , 1,0)
sql insert into $tb2 (ts,c3,c8) values ( $tsN , 100,200)
$tsN = $ts0 + 2
sql insert into $tb1 (ts,c1,c3) values ( $tsN , 1,0)
sql insert into $tb2 (ts,c3,c8) values ( $tsN , 100,200)
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 2000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sleep 2000
sql select * from $tb1 where ts = $ts0
if $rows != 1 then
return -1
endi
if $data01 != 2 then
return -1
endi
if $data02 != 1 then
return -1
endi
if $data03 != 3 then
return -1
endi
sql select ts,c1,c3,c8,c10 from $tb2 where ts = $ts0
if $rows != 1 then
return -1
endi
if $data01 != NULL then
return -1
endi
if $data02 != 3 then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data04 != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -715,7 +715,10 @@ void putBackAutoPtr(int type, STire* tire) {
} else {
tires[type]->ref--;
assert(tires[type]->ref > 0);
ASSERT(tires[type]->ref > 0);
if (tires[type]->ref <= 0) {
return;
}
}
taosThreadMutexUnlock(&tiresMutex);

View File

@ -40,7 +40,7 @@ static void shellPositionCursorEnd(SShellCmd *cmd);
static void shellPrintChar(char c, int32_t times);
static void shellPositionCursor(int32_t step, int32_t direction);
static void shellUpdateBuffer(SShellCmd *cmd);
static int32_t shellIsReadyGo(SShellCmd *cmd);
static bool shellIsReadyGo(SShellCmd *cmd);
static void shellGetMbSizeInfo(const char *str, int32_t *size, int32_t *width);
static void shellResetCommand(SShellCmd *cmd, const char s[]);
void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos);
@ -62,7 +62,8 @@ int32_t shellCountPrefixOnes(uint8_t c) {
}
void shellGetPrevCharSize(const char *str, int32_t pos, int32_t *size, int32_t *width) {
assert(pos > 0);
ASSERT(pos > 0);
if (pos <= 0) return;
TdWchar wc;
*size = 0;
@ -75,13 +76,14 @@ void shellGetPrevCharSize(const char *str, int32_t pos, int32_t *size, int32_t *
}
taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
// assert(rc == *size); // it will be core, if str is encode by utf8 and taos charset is gbk
// ASSERT(rc == *size); // it will be core, if str is encode by utf8 and taos charset is gbk
*width = taosWcharWidth(wc);
}
void shellGetNextCharSize(const char *str, int32_t pos, int32_t *size, int32_t *width) {
assert(pos >= 0);
ASSERT(pos >= 0);
if(pos < 0) return;
TdWchar wc;
*size = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
@ -89,7 +91,8 @@ void shellGetNextCharSize(const char *str, int32_t pos, int32_t *size, int32_t *
}
void shellInsertChar(SShellCmd *cmd, char *c, int32_t size) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
TdWchar wc;
if (taosMbToWchar(&wc, c, size) < 0) return;
@ -135,7 +138,8 @@ void shellInsertStr(SShellCmd *cmd, char *str, int32_t size) {
}
void shellBackspaceChar(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
if (cmd->cursorOffset > 0) {
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
@ -155,7 +159,8 @@ void shellBackspaceChar(SShellCmd *cmd) {
}
void shellClearLineBefore(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
memmove(cmd->command, cmd->command + cmd->cursorOffset, cmd->commandSize - cmd->cursorOffset);
@ -169,7 +174,8 @@ void shellClearLineBefore(SShellCmd *cmd) {
}
void shellClearLineAfter(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
cmd->commandSize -= cmd->endOffset - cmd->cursorOffset;
@ -178,7 +184,8 @@ void shellClearLineAfter(SShellCmd *cmd) {
}
void shellDeleteChar(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
if (cmd->cursorOffset < cmd->commandSize) {
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
@ -196,7 +203,8 @@ void shellDeleteChar(SShellCmd *cmd) {
}
void shellMoveCursorLeft(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
if (cmd->cursorOffset > 0) {
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
@ -210,7 +218,8 @@ void shellMoveCursorLeft(SShellCmd *cmd) {
}
void shellMoveCursorRight(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
if (cmd->cursorOffset < cmd->commandSize) {
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
@ -224,7 +233,8 @@ void shellMoveCursorRight(SShellCmd *cmd) {
}
void shellPositionCursorHome(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
if (cmd->cursorOffset > 0) {
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
@ -244,7 +254,8 @@ void positionCursorMiddle(SShellCmd *cmd) {
}
void shellPositionCursorEnd(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
if (cmd->cursorOffset < cmd->commandSize) {
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
@ -279,7 +290,8 @@ void shellPositionCursor(int32_t step, int32_t direction) {
}
void shellUpdateBuffer(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
if (shellRegexMatch(cmd->buffer, "(\\s+$)|(^$)", REG_EXTENDED)) strcat(cmd->command, " ");
strcat(cmd->buffer, cmd->command);
@ -293,8 +305,9 @@ void shellUpdateBuffer(SShellCmd *cmd) {
shellShowOnScreen(cmd);
}
int32_t shellIsReadyGo(SShellCmd *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
bool shellIsReadyGo(SShellCmd *cmd) {
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return false;
char *total = (char *)taosMemoryCalloc(1, SHELL_MAX_COMMAND_SIZE);
memset(cmd->command + cmd->commandSize, 0, SHELL_MAX_COMMAND_SIZE - cmd->commandSize);
@ -305,11 +318,11 @@ int32_t shellIsReadyGo(SShellCmd *cmd) {
"\\s*clear\\s*$)";
if (shellRegexMatch(total, reg_str, REG_EXTENDED | REG_ICASE)) {
taosMemoryFree(total);
return 1;
return true;
}
taosMemoryFree(total);
return 0;
return false;
}
void shellGetMbSizeInfo(const char *str, int32_t *size, int32_t *width) {
@ -321,7 +334,8 @@ void shellGetMbSizeInfo(const char *str, int32_t *size, int32_t *width) {
}
void shellResetCommand(SShellCmd *cmd, const char s[]) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
ASSERT(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
if(cmd->cursorOffset > cmd->commandSize || cmd->endOffset < cmd->screenOffset) return;
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
memset(cmd->buffer, 0, SHELL_MAX_COMMAND_SIZE);
@ -399,7 +413,7 @@ void shellShowOnScreen(SShellCmd *cmd) {
int32_t ret = taosMbToWchar(&wc, str, MB_CUR_MAX);
if (ret < 0) break;
size += ret;
/* assert(size >= 0); */
/* ASSERT(size >= 0); */
int32_t width = taosWcharWidth(wc);
if (remain_column > width) {
printf("%lc", wc);

View File

@ -713,7 +713,7 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
}
default:
assert(false);
ASSERT(false);
}
return 0;

View File

@ -211,8 +211,7 @@ int smlProcess_json3_Test() {
taos_free_result(pRes);
const char *sql[] = {
// "[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}}]"
"{\"metric\": \"dcxnmr\", \"timestamp\": {\"value\": 1626006833639000000, \"type\": \"ns\"}, \"value\": {\"value\": false, \"type\": \"bool\"}, \"tags\": {\"t0\": {\"value\": false, \"type\": \"bool\"}, \"t1\": {\"value\": 127, \"type\": \"tinyint\"}, \"t2\": {\"value\": 32767, \"type\": \"smallint\"}, \"t3\": {\"value\": 2147483647, \"type\": \"int\"}, \"t4\": {\"value\": 9223372036854775807, \"type\": \"bigint\"}, \"t5\": {\"value\": 11.12345027923584, \"type\": \"float\"}, \"t6\": {\"value\": 22.123456789, \"type\": \"double\"}, \"t7\": {\"value\": \"binaryTagValue\", \"type\": \"binary\"}, \"t8\": {\"value\": \"abc{aaa\", \"type\": \"nchar\"}}}"
"[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}}]"
};
char *sql1[1] = {0};
for(int i = 0; i < 1; i++){
@ -741,26 +740,11 @@ int sml_dup_time_Test() {
taos_free_result(pRes);
const char *sql[] = {//"test_ms,t0=t c0=f 1626006833641",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11."
"12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" "
"c0=false,c1=1i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22."
"123456789f64,c7=\"xcxvwjvf\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11."
"12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" "
"c0=T,c1=2i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22."
"123456789f64,c7=\"fixrzcuq\",c8=L\"ncharColValue\",c9=7u64 1626006834639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11."
"12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" "
"c0=t,c1=3i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22."
"123456789f64,c7=\"iupzdqub\",c8=L\"ncharColValue\",c9=7u64 1626006835639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11."
"12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" "
"c0=t,c1=4i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22."
"123456789f64,c7=\"yvvtzzof\",c8=L\"ncharColValue\",c9=7u64 1626006836639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11."
"12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" "
"c0=t,c1=5i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22."
"123456789f64,c7=\"vbxpilkj\",c8=L\"ncharColValue\",c9=7u64 1626006837639000000"};
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=1i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"xcxvwjvf\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=T,c1=2i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"fixrzcuq\",c8=L\"ncharColValue\",c9=7u64 1626006834639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=3i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"iupzdqub\",c8=L\"ncharColValue\",c9=7u64 1626006835639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=4i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"yvvtzzof\",c8=L\"ncharColValue\",c9=7u64 1626006836639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=5i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"vbxpilkj\",c8=L\"ncharColValue\",c9=7u64 1626006837639000000"};
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
@ -943,8 +927,10 @@ int sml_ttl_Test() {
pRes = taos_query(taos, "select `ttl` from information_schema.ins_tables where table_name='t_be97833a0e1f523fcdaeb6291d6fdf27'");
printf("%s result2:%s\n", __FUNCTION__, taos_errstr(pRes));
TAOS_ROW row = taos_fetch_row(pRes);
int32_t ttl = *(int32_t*)row[0];
ASSERT(ttl == 20);
if(row != NULL && *row != NULL){
int32_t ttl = *(int32_t*)row[0];
ASSERT(ttl == 20);
}
int code = taos_errno(pRes);
taos_free_result(pRes);