other: merge main.

This commit is contained in:
Haojun Liao 2023-03-14 12:13:09 +08:00
commit 32ddecc347
42 changed files with 806 additions and 619 deletions

View File

@ -460,7 +460,7 @@ pipeline {
cd ${WKC}/tests/parallel_test cd ${WKC}/tests/parallel_test
export DEFAULT_RETRY_TIME=2 export DEFAULT_RETRY_TIME=2
date date
''' + timeout_cmd + ''' time ./run.sh -e -m /home/m.json -t cases.task -b ${BRANCH_NAME}_${BUILD_ID} -l ${WKDIR}/log -o 480 ''' + extra_param + ''' ''' + timeout_cmd + ''' time ./run.sh -e -m /home/m.json -t cases.task -b ${BRANCH_NAME}_${BUILD_ID} -l ${WKDIR}/log -o 600 ''' + extra_param + '''
''' '''
} }
} }

View File

@ -41,7 +41,7 @@ TDengine 知识地图中涵盖了 TDengine 的各种知识点,揭示了各概
<td style={{padding:'1em 3em',border:0}}><img src={official_account} alt="TDengine 微信公众号" width="200" /></td> <td style={{padding:'1em 3em',border:0}}><img src={official_account} alt="TDengine 微信公众号" width="200" /></td>
</tr> </tr>
<tr align="center"> <tr align="center">
<td style={{padding:'1em 3em',border:0}}>加入“物联网大数据技术群”<br/>与大家进行技术交流</td> <td style={{padding:'1em 3em',border:0}}>加入 TDengine 微信群<br/>了解学习最新物联网技术</td>
<td style={{padding:'1em 3em',border:0}}>关注 TDengine 视频号<br/>收看技术直播与教学视频</td> <td style={{padding:'1em 3em',border:0}}>关注 TDengine 视频号<br/>收看技术直播与教学视频</td>
<td style={{padding:'1em 3em',border:0}}>关注 TDengine 公众号<br/>阅读技术文章与行业案例</td> <td style={{padding:'1em 3em',border:0}}>关注 TDengine 公众号<br/>阅读技术文章与行业案例</td>
</tr> </tr>

View File

@ -98,7 +98,8 @@ extern char *tsSvrCrashReportUri;
// query buffer management // query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row loading cache as much as possible
// query client // query client
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
@ -145,10 +146,10 @@ extern char tsUdfdResFuncs[];
extern char tsUdfdLdLibPath[]; extern char tsUdfdLdLibPath[];
// schemaless // schemaless
extern char tsSmlChildTableName[]; extern char tsSmlChildTableName[];
extern char tsSmlTagName[]; extern char tsSmlTagName[];
//extern bool tsSmlDataFormat; // extern bool tsSmlDataFormat;
//extern int32_t tsSmlBatchSize; // extern int32_t tsSmlBatchSize;
// wal // wal
extern int64_t tsWalFsyncDataSizeLimit; extern int64_t tsWalFsyncDataSizeLimit;

View File

@ -1136,6 +1136,7 @@ typedef struct {
int64_t numOfInsertSuccessReqs; int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs; int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs; int64_t numOfBatchInsertSuccessReqs;
int32_t numOfCachedTables;
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {

View File

@ -143,10 +143,11 @@ typedef struct SSyncFSM {
void* data; void* data;
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);

View File

@ -542,7 +542,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913) #define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914) #define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal #define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) // #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq // tq

View File

@ -55,6 +55,8 @@ void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle);
size_t taosLRUCacheGetUsage(SLRUCache *cache); size_t taosLRUCacheGetUsage(SLRUCache *cache);
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache); size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache);
int32_t taosLRUCacheGetElems(SLRUCache *cache);
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity); void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity);
size_t taosLRUCacheGetCapacity(SLRUCache *cache); size_t taosLRUCacheGetCapacity(SLRUCache *cache);

View File

@ -194,6 +194,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
SDecoder coder; SDecoder coder;
char* string = NULL; char* string = NULL;
uDebug("processCreateStb called");
// decode and process req // decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
@ -203,7 +204,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
goto _err; goto _err;
} }
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
uDebug("processCreateStb %s", string);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
@ -213,6 +214,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
char* string = NULL; char* string = NULL;
uDebug("processAlterStb called");
// decode and process req // decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -223,6 +225,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
goto _err; goto _err;
} }
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
uDebug("processAlterStb %s", string);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
@ -346,6 +349,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
SVCreateTbReq* pCreateReq; SVCreateTbReq* pCreateReq;
char* string = NULL; char* string = NULL;
// decode // decode
uDebug("processCreateTable called");
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len); tDecoderInit(&decoder, data, len);
@ -359,9 +363,9 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
if (pCreateReq->type == TSDB_CHILD_TABLE) { if (pCreateReq->type == TSDB_CHILD_TABLE) {
string = buildCreateCTableJson(req.pReqs, req.nReqs); string = buildCreateCTableJson(req.pReqs, req.nReqs);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) { } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
string = string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
} }
uDebug("processCreateTable :%s", string);
} }
_exit: _exit:
@ -377,6 +381,7 @@ _exit:
} }
static char* processAutoCreateTable(STaosxRsp* rsp) { static char* processAutoCreateTable(STaosxRsp* rsp) {
uDebug("processAutoCreateTable called");
if (rsp->createTableNum <= 0) { if (rsp->createTableNum <= 0) {
uError("WriteRaw:processAutoCreateTable rsp->createTableNum <= 0"); uError("WriteRaw:processAutoCreateTable rsp->createTableNum <= 0");
goto _exit; goto _exit;
@ -402,7 +407,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
} }
} }
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
uDebug("processAutoCreateTable :%s", string);
_exit: _exit:
for (int i = 0; i < rsp->createTableNum; i++) { for (int i = 0; i < rsp->createTableNum; i++) {
tDecoderClear(&decoder[i]); tDecoderClear(&decoder[i]);
@ -422,6 +427,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
char* string = NULL; char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
uDebug("processAlterTable called");
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
@ -527,6 +533,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
break; break;
} }
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processAlterTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
@ -539,6 +546,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SVDropStbReq req = {0}; SVDropStbReq req = {0};
char* string = NULL; char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
uDebug("processDropSTable called");
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -560,7 +568,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "tableName", tableName); cJSON_AddItemToObject(json, "tableName", tableName);
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDropSTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -573,6 +581,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
cJSON* json = NULL; cJSON* json = NULL;
char* string = NULL; char* string = NULL;
uDebug("processDeleteTable called");
// decode and process req // decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
@ -599,7 +608,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "sql", sqlJson); cJSON_AddItemToObject(json, "sql", sqlJson);
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDeleteTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&coder); tDecoderClear(&coder);
@ -612,6 +621,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
char* string = NULL; char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
uDebug("processDropTable called");
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
@ -641,7 +651,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "tableNameList", tableNameList); cJSON_AddItemToObject(json, "tableNameList", tableNameList);
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDropTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -655,6 +665,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
uDebug("taosCreateStb called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
@ -698,6 +709,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.source = TD_REQ_FROM_TAOX; pReq.source = TD_REQ_FROM_TAOX;
pReq.igExists = true; pReq.igExists = true;
uDebug("taosCreateStb name:%s suid:%"PRId64" processSuid:%"PRId64, req.name, req.suid, pReq.suid);
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SName tableName; SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@ -744,6 +756,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
uDebug("taosDropStb called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
@ -768,6 +781,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.source = TD_REQ_FROM_TAOX; pReq.source = TD_REQ_FROM_TAOX;
pReq.suid = processSuid(req.suid, pRequest->pDb); pReq.suid = processSuid(req.suid, pRequest->pDb);
uDebug("taosDropStb name:%s suid:%"PRId64" processSuid:%"PRId64, req.name, req.suid, pReq.suid);
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SName tableName = {0}; SName tableName = {0};
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@ -825,6 +839,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL; SHashObj* pVgroupHashmap = NULL;
uDebug("taosCreateTable called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
@ -884,7 +899,10 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if (pCreateReq->type == TSDB_CHILD_TABLE) { if (pCreateReq->type == TSDB_CHILD_TABLE) {
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SName sName = {0}; SName sName = {0};
tb_uid_t oldSuid = pCreateReq->ctb.suid;
pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb); pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
uDebug("taosCreateTable name:%s sname:%s suid:%"PRId64" processSuid:%"PRId64, pCreateReq->name, pCreateReq->ctb.stbName, pCreateReq->ctb.suid, oldSuid);
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName); toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -979,6 +997,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL; SHashObj* pVgroupHashmap = NULL;
uDebug("taosDropTable called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
@ -1023,6 +1042,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
pDropReq = req.pReqs + iReq; pDropReq = req.pReqs + iReq;
pDropReq->igNotExists = true; pDropReq->igNotExists = true;
pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb); pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
uDebug("taosDropTable name:%s suid:%"PRId64" processSuid:%"PRId64, pDropReq->name, pDropReq->suid, pDropReq->suid);
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName = {0}; SName pName = {0};
@ -1114,6 +1134,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
SDecoder coder = {0}; SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
uDebug("taosDeleteData called");
// decode and process req // decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead); int32_t len = metaLen - sizeof(SMsgHead);
@ -1151,6 +1172,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
SArray* pArray = NULL; SArray* pArray = NULL;
SVgDataBlocks* pVgData = NULL; SVgDataBlocks* pVgData = NULL;
uDebug("taosAlterTable called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1196,6 +1218,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
uDebug("taosAlterTable name:%s", req.tbName);
pArray = taosArrayInit(1, sizeof(void*)); pArray = taosArrayInit(1, sizeof(void*));
if (NULL == pArray) { if (NULL == pArray) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -1261,6 +1284,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SHashObj* pVgHash = NULL; SHashObj* pVgHash = NULL;
uDebug("taos_write_raw_block_with_fields called");
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) { if (!pRequest) {
@ -1280,6 +1304,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
tstrncpy(pName.tname, tbname, sizeof(pName.tname)); tstrncpy(pName.tname, tbname, sizeof(pName.tname));
uDebug("taos_write_raw_block_with_fields name:%s", tbname);
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1342,6 +1367,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SHashObj* pVgHash = NULL; SHashObj* pVgHash = NULL;
uDebug("taos_write_raw_block called");
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) { if (!pRequest) {
uError("WriteRaw:createRequest error request is null"); uError("WriteRaw:createRequest error request is null");
@ -1360,6 +1386,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
tstrncpy(pName.tname, tbname, sizeof(pName.tname)); tstrncpy(pName.tname, tbname, sizeof(pName.tname));
uDebug("taos_write_raw_block name:%s", tbname);
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1423,6 +1450,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
SMqRspObj rspObj = {0}; SMqRspObj rspObj = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
uDebug("tmqWriteRawDataImpl called");
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
@ -1468,7 +1496,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end; goto end;
} }
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
uDebug("raw data block num:%d\n", rspObj.rsp.blockNum); uDebug("tmqWriteRawDataImpl raw data block num:%d", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) { while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) { if (!rspObj.rsp.withSchema) {
@ -1483,7 +1511,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end; goto end;
} }
uDebug("raw data tbname:%s\n", tbName); uDebug("tmqWriteRawDataImpl raw data tbname:%s", tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb); strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName); strcpy(pName.tname, tbName);
@ -1556,6 +1584,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SDecoder decoder = {0}; SDecoder decoder = {0};
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SVCreateTbReq* pCreateReqDst = NULL; SVCreateTbReq* pCreateReqDst = NULL;
uDebug("tmqWriteRawMetaDataImpl called");
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
@ -1602,7 +1631,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
} }
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
uDebug("raw data block num:%d\n", rspObj.rsp.blockNum); uDebug("tmqWriteRawMetaDataImpl raw data block num:%d", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) { while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) { if (!rspObj.rsp.withSchema) {
@ -1617,7 +1646,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
uDebug("raw data tbname:%s\n", tbName); uDebug("tmqWriteRawMetaDataImpl raw data tbname:%s\n", tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb); strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName); strcpy(pName.tname, tbName);
@ -1726,6 +1755,7 @@ end:
} }
char* tmq_get_json_meta(TAOS_RES* res) { char* tmq_get_json_meta(TAOS_RES* res) {
uDebug("tmq_get_json_meta called");
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) {
return NULL; return NULL;
} }
@ -1760,6 +1790,7 @@ char* tmq_get_json_meta(TAOS_RES* res) {
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
uDebug("tmq_get_raw called");
if (!raw || !res) { if (!raw || !res) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -1768,6 +1799,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw = pMetaRspObj->metaRsp.metaRsp; raw->raw = pMetaRspObj->metaRsp.metaRsp;
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen; raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_type = pMetaRspObj->metaRsp.resMsgType; raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
uDebug("tmq_get_raw meta");
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res)) {
SMqRspObj* rspObj = ((SMqRspObj*)res); SMqRspObj* rspObj = ((SMqRspObj*)res);
@ -1787,6 +1819,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw = buf; raw->raw = buf;
raw->raw_len = len; raw->raw_len = len;
raw->raw_type = RES_TYPE__TMQ; raw->raw_type = RES_TYPE__TMQ;
uDebug("tmq_get_raw data");
} else if (TD_RES_TMQ_METADATA(res)) { } else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res); SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
@ -1806,19 +1839,23 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw = buf; raw->raw = buf;
raw->raw_len = len; raw->raw_len = len;
raw->raw_type = RES_TYPE__TMQ_METADATA; raw->raw_type = RES_TYPE__TMQ_METADATA;
uDebug("tmq_get_raw meta data");
} else { } else {
uError("tmq_get_raw error:%d", *(int8_t*)res);
return TSDB_CODE_TMQ_INVALID_MSG; return TSDB_CODE_TMQ_INVALID_MSG;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tmq_free_raw(tmq_raw_data raw) { void tmq_free_raw(tmq_raw_data raw) {
uDebug("tmq_free_raw raw_type:%d", raw.raw_type);
if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) { if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw); taosMemoryFree(raw.raw);
} }
} }
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
uDebug("tmq_write_raw called");
if (!taos) { if (!taos) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -1283,7 +1283,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
} }
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
@ -1910,7 +1909,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL; return NULL;
} }
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
int32_t retryCnt = 0; int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 40) { if (retryCnt++ > 40) {

View File

@ -233,6 +233,7 @@ static const SSysDbTableSchema vgroupsSchema[] = {
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "cacheTables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true}, {.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
// {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, // {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
}; };

View File

@ -154,6 +154,7 @@ char tsTagFilterCache = 0;
// positive value (in MB) // positive value (in MB)
int32_t tsQueryBufferSize = -1; int32_t tsQueryBufferSize = -1;
int64_t tsQueryBufferSizeBytes = -1; int64_t tsQueryBufferSizeBytes = -1;
int32_t tsCacheLazyLoadThreshold = 500;
int32_t tsDiskCfgNum = 0; int32_t tsDiskCfgNum = 0;
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
@ -497,6 +498,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
GRANT_CFG_ADD; GRANT_CFG_ADD;
return 0; return 0;
} }
@ -824,6 +827,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
} }
tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32;
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
GRANT_CFG_GET; GRANT_CFG_GET;

View File

@ -1070,7 +1070,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1; if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1;
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1; if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1; if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
if (tEncodeI32(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1;
} }
@ -1148,7 +1149,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1; if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1;
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1; if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1; if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t*)&reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) { if (taosArrayPush(pReq->pVloads, &vload) == NULL) {

View File

@ -359,6 +359,7 @@ typedef struct {
int8_t replica; int8_t replica;
SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
void* pTsma; void* pTsma;
int32_t numOfCachedTables;
} SVgObj; } SVgObj;
typedef struct { typedef struct {

View File

@ -113,6 +113,7 @@ typedef struct SMnode {
bool deploy; bool deploy;
char *path; char *path;
int64_t checkTime; int64_t checkTime;
SyncIndex applied;
SSdb *pSdb; SSdb *pSdb;
SArray *pSteps; SArray *pSteps;
SQHandle *pQuery; SQHandle *pQuery;

View File

@ -412,6 +412,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (pVgroup != NULL) { if (pVgroup != NULL) {
if (pVload->syncState == TAOS_SYNC_STATE_LEADER) { if (pVload->syncState == TAOS_SYNC_STATE_LEADER) {
pVgroup->cacheUsage = pVload->cacheUsage; pVgroup->cacheUsage = pVload->cacheUsage;
pVgroup->numOfCachedTables = pVload->numOfCachedTables;
pVgroup->numOfTables = pVload->numOfTables; pVgroup->numOfTables = pVload->numOfTables;
pVgroup->numOfTimeSeries = pVload->numOfTimeSeries; pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
pVgroup->totalStorage = pVload->totalStorage; pVgroup->totalStorage = pVload->totalStorage;
@ -440,7 +441,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (roleChanged) { if (roleChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) { if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs); mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
pDb->stateTs, curMs);
pDb->stateTs = curMs; pDb->stateTs = curMs;
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);

View File

@ -380,11 +380,13 @@ static int32_t mndInitSdb(SMnode *pMnode) {
} }
static int32_t mndOpenSdb(SMnode *pMnode) { static int32_t mndOpenSdb(SMnode *pMnode) {
int32_t code = 0;
if (!pMnode->deploy) { if (!pMnode->deploy) {
return sdbReadFile(pMnode->pSdb); code = sdbReadFile(pMnode->pSdb);
} else {
return 0;
} }
atomic_store_64(&pMnode->applied, pMnode->pSdb->commitIndex);
return code;
} }
static void mndCleanupSdb(SMnode *pMnode) { static void mndCleanupSdb(SMnode *pMnode) {

View File

@ -129,6 +129,14 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = 0; int32_t code = 0;
pMsg->info.conn.applyIndex = pMeta->index;
pMsg->info.conn.applyTerm = pMeta->term;
if (pMsg->code == 0) {
SMnode *pMnode = pFsm->data;
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
}
if (!syncUtilUserCommit(pMsg->msgType)) { if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out; goto _out;
} }
@ -140,6 +148,11 @@ _out:
return code; return code;
} }
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
SMnode *pMnode = pFSM->data;
return atomic_load_64(&pMnode->applied);
}
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
mInfo("start to read snapshot from sdb in atomic way"); mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
@ -153,7 +166,7 @@ static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
} }
void mndRestoreFinish(const SSyncFSM *pFsm) { void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) { if (!pMnode->deploy) {
@ -167,6 +180,8 @@ void mndRestoreFinish(const SSyncFSM *pFsm) {
} else { } else {
mInfo("vgId:1, sync restore finished"); mInfo("vgId:1, sync restore finished");
} }
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
} }
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
@ -253,6 +268,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode; pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg; pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
pFsm->FpPreCommitCb = NULL; pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL; pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;

View File

@ -803,6 +803,9 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t cacheUsage = (int32_t)pVgroup->cacheUsage; int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false); colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfCachedTables, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);

View File

@ -198,9 +198,10 @@ int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode); size_t tsdbCacheGetCapacity(SVnode *pVnode);
size_t tsdbCacheGetUsage(SVnode *pVnode); size_t tsdbCacheGetUsage(SVnode *pVnode);
int32_t tsdbCacheGetElems(SVnode *pVnode);
// tq // tq
typedef struct SMetaTableInfo { typedef struct SMetaTableInfo {
@ -264,7 +265,7 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char* id); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);

View File

@ -706,6 +706,7 @@ typedef struct SMergeTree {
bool destroyLoadInfo; bool destroyLoadInfo;
SSttBlockLoadInfo *pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
const char *idStr; const char *idStr;
bool ignoreEarlierTs;
} SMergeTree; } SMergeTree;
typedef struct { typedef struct {
@ -748,9 +749,10 @@ struct SDiskDataBuilder {
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr); bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);

View File

@ -1116,7 +1116,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->tbSink.pTSchema = pTask->tbSink.pTSchema =
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1); tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
ASSERT(pTask->tbSink.pTSchema); if(pTask->tbSink.pTSchema == NULL) {
return -1;
}
} }
streamSetupTrigger(pTask); streamSetupTrigger(pTask);

View File

@ -120,7 +120,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
return -1; return -1;
} }
ASSERT(!(pRsp->withTbName || pRsp->withSchema)); if(pRsp->withTbName || pRsp->withSchema){
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
return -1;
}
return 0; return 0;
} }

View File

@ -268,8 +268,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
.msgLen = len, .msgLen = len,
.ver = ver, .ver = ver,
}; };
if(qStreamSetScanMemData(task, submit) != 0){
qStreamSetScanMemData(task, submit); continue;
}
// here start to scan submit block to extract the subscribed data // here start to scan submit block to extract the subscribed data
while (1) { while (1) {

View File

@ -709,7 +709,10 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
sourceIdx++; sourceIdx++;
targetIdx++; targetIdx++;
} else { } else {
ASSERT(0); for (int32_t i = 0; i < pCol->nVal; i++) {
colDataSetNULL(pColData, i);
}
targetIdx++;
} }
} }
} else { } else {
@ -751,7 +754,8 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
sourceIdx++; sourceIdx++;
break; break;
} else { } else {
ASSERT(0); colDataSetNULL(pColData, i);
break;
} }
} }
} }

View File

@ -632,11 +632,16 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
} }
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL); &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
state->pMergeTree = &state->mergeTree; state->pMergeTree = &state->mergeTree;
bool hasVal = tMergeTreeNext(&state->mergeTree); bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) { if (!hasVal) {
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
*pIgnoreEarlierTs = true;
*ppRow = NULL;
return code;
}
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
goto _next_fileset; goto _next_fileset;
} }
@ -644,16 +649,13 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
} }
case SFSLASTNEXTROW_BLOCKROW: { case SFSLASTNEXTROW_BLOCKROW: {
bool hasVal = false; bool hasVal = false;
do { state->row = tMergeTreeGetRow(&state->mergeTree);
state->row = tMergeTreeGetRow(&state->mergeTree); *ppRow = &state->row;
*ppRow = &state->row; hasVal = tMergeTreeNext(&state->mergeTree);
hasVal = tMergeTreeNext(&state->mergeTree);
} while (TSDBROW_TS(&state->row) <= state->lastTs && hasVal);
if (TSDBROW_TS(&state->row) <= state->lastTs) { if (TSDBROW_TS(&state->row) <= state->lastTs) {
*pIgnoreEarlierTs = true; *pIgnoreEarlierTs = true;
state->state = SFSLASTNEXTROW_FILESET; *ppRow = NULL;
goto _next_fileset; return code;
} }
*pIgnoreEarlierTs = false; *pIgnoreEarlierTs = false;
@ -835,7 +837,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
if (block.maxKey.ts <= state->lastTs) { if (block.maxKey.ts <= state->lastTs) {
*pIgnoreEarlierTs = true; *pIgnoreEarlierTs = true;
goto _next_fileset; if (state->pBlockData) {
tBlockDataDestroy(state->pBlockData);
state->pBlockData = NULL;
}
*ppRow = NULL;
return code;
} }
*pIgnoreEarlierTs = false; *pIgnoreEarlierTs = false;
tBlockDataReset(state->pBlockData); tBlockDataReset(state->pBlockData);
@ -1724,6 +1732,15 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) {
return usage; return usage;
} }
int32_t tsdbCacheGetElems(SVnode *pVnode) {
int32_t elems = 0;
if (pVnode->pTsdb != NULL) {
elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
}
return elems;
}
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) { static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
struct { struct {
int32_t fid; int32_t fid;

View File

@ -332,6 +332,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
int64_t st = taosGetTimestampUs();
for (int32_t i = 0; i < pr->numOfTables; ++i) { for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i]; STableKeyInfo* pKeyInfo = &pr->pTableList[i];
@ -407,7 +408,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
if (hasNotNullRow) { if (hasNotNullRow) {
pr->lastTs = minTs; double cost = (taosGetTimestampUs() - st) / 1000.0;
if (cost > tsCacheLazyLoadThreshold) {
pr->lastTs = minTs;
}
} }
} }
@ -417,7 +421,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
if (hasRes) { if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr); saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
} }
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i]; STableKeyInfo* pKeyInfo = &pr->pTableList[i];

View File

@ -29,9 +29,11 @@ struct SLDataIter {
STimeWindow timeWindow; STimeWindow timeWindow;
SVersionRange verRange; SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo; SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs;
}; };
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfSttTrigger) { SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
int32_t numOfSttTrigger) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo)); SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -162,7 +164,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, idStr); tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
idStr);
return &pInfo->blockData[pInfo->currentLoadBlockIndex]; return &pInfo->blockData[pInfo->currentLoadBlockIndex];
_exit: _exit:
@ -263,7 +266,7 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr) { const char *idStr, bool strictTimeRange) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
@ -340,6 +343,17 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
if ((*pIter)->iSttBlk != -1) { if ((*pIter)->iSttBlk != -1) {
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1; (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) ||
(!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) {
(*pIter)->pSttBlk = NULL;
}
if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
(!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
(*pIter)->pSttBlk = NULL;
(*pIter)->ignoreEarlierTs = true;
}
} }
return code; return code;
@ -421,7 +435,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
pBlockData->aUid != NULL) { pBlockData->aUid != NULL) {
i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward); i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
if (i == -1) { if (i == -1) {
tsdbDebug("failed to find the data in pBlockData, uid:%"PRIu64" , %s", pIter->uid, idStr); tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
pIter->iRow = -1; pIter->iRow = -1;
return; return;
} }
@ -508,7 +522,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
} }
// set start row index // set start row index
pIter->iRow = pIter->backward? pBlockData->nRow-1:0; pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
} }
} }
@ -551,7 +565,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr) { bool destroyLoadInfo, const char *idStr, bool strictTimeRange) {
pMTree->backward = backward; pMTree->backward = backward;
pMTree->pIter = NULL; pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
@ -569,11 +583,12 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
pMTree->pLoadInfo = pBlockLoadInfo; pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo; pMTree->destroyLoadInfo = destroyLoadInfo;
pMTree->ignoreEarlierTs = false;
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
struct SLDataIter *pIter = NULL; struct SLDataIter *pIter = NULL;
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr); &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
@ -583,6 +598,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
taosArrayPush(pMTree->pIterList, &pIter); taosArrayPush(pMTree->pIterList, &pIter);
tMergeTreeAddIter(pMTree, pIter); tMergeTreeAddIter(pMTree, pIter);
} else { } else {
if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
}
tLDataIterClose(pIter); tLDataIterClose(pIter);
} }
} }
@ -596,6 +614,8 @@ _end:
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
bool tMergeTreeNext(SMergeTree *pMTree) { bool tMergeTreeNext(SMergeTree *pMTree) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pMTree->pIter) { if (pMTree->pIter) {

View File

@ -315,11 +315,11 @@ static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables)
} }
if (pBuf->numOfTables > 0) { if (pBuf->numOfTables > 0) {
STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData); STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
taosMemoryFree(*p); taosMemoryFree(*p);
pBuf->numOfTables /= pBuf->numPerBucket; pBuf->numOfTables /= pBuf->numPerBucket;
} }
int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket; int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
if (pBuf->pData == NULL) { if (pBuf->pData == NULL) {
@ -919,7 +919,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
pBlockNum->numOfBlocks += 1; pBlockNum->numOfBlocks += 1;
} }
if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
numOfQTable += 1; numOfQTable += 1;
} }
} }
@ -1798,7 +1798,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
while (1) { while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) { // the next value will be the accessed key in stt if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->lastKeyInStt += step; pScanInfo->lastKeyInStt += step;
return false; return false;
} }
@ -2481,7 +2481,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
pScanInfo->uid, pReader->idStr); pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
pLBlockReader->pInfo, false, pReader->idStr); pLBlockReader->pInfo, false, pReader->idStr, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
@ -3512,7 +3512,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
CHECK_FILEBLOCK_STATE* state) { CHECK_FILEBLOCK_STATE* state) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
*state = CHECK_FILEBLOCK_QUIT; *state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
@ -3927,7 +3927,8 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
if (code) { if (code) {
return code; return code;
} }
pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num); pReader->status.uidList.tableUidList =
(uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
} }
taosHashClear(pReader->status.pTableMap); taosHashClear(pReader->status.pTableMap);

View File

@ -382,6 +382,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->syncRestore = state.restored; pLoad->syncRestore = state.restored;
pLoad->syncCanRead = state.canRead; pLoad->syncCanRead = state.canRead;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
pLoad->totalStorage = (int64_t)3 * 1073741824; pLoad->totalStorage = (int64_t)3 * 1073741824;

View File

@ -306,13 +306,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
void *pReq; void *pReq;
int32_t len; int32_t len;
int32_t ret; int32_t ret;
/*
if (!pVnode->inUse) {
terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL;
vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
return -1;
}
*/
if (version <= pVnode->state.applied) { if (version <= pVnode->state.applied) {
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
pVnode->state.applied); pVnode->state.applied);
@ -326,8 +320,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applied + 1 == version); ASSERT(pVnode->state.applied + 1 == version);
pVnode->state.applied = version; atomic_store_64(&pVnode->state.applied, version);
pVnode->state.applyTerm = pMsg->info.conn.applyTerm; atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;

View File

@ -433,7 +433,23 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
} }
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); if (pMsg->code == 0) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
const STraceId *trace = &pMsg->info.traceId;
SVnode *pVnode = pFsm->data;
vnodePostBlockMsg(pVnode, pMsg);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return 0;
} }
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
@ -443,6 +459,11 @@ static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const
return 0; return 0;
} }
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
SVnode *pVnode = pFSM->data;
return atomic_load_64(&pVnode->state.applied);
}
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
@ -505,21 +526,26 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p
return code; return code;
} }
static void vnodeRestoreFinish(const SSyncFSM *pFsm) { static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SyncIndex appliedIdx = -1;
do { do {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); appliedIdx = vnodeSyncAppliedIndex(pFsm);
if (itemSize == 0) { ASSERT(appliedIdx <= commitIdx);
vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId); if (appliedIdx == commitIdx) {
vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
break; break;
} else { } else {
vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize); vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
", applied-index:%" PRId64,
pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
taosMsleep(10); taosMsleep(10);
} }
} while (true); } while (true);
walApplyVer(pVnode->pWal, pVnode->state.applied); ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true; pVnode->restored = true;
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
@ -569,6 +595,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;

View File

@ -990,19 +990,16 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
return &pTaskInfo->streamInfo.metaRsp; return &pTaskInfo->streamInfo.metaRsp;
} }
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) { int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
return pTaskInfo->streamInfo.prepareStatus.uid; return pTaskInfo->streamInfo.prepareStatus.uid;
} }
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal)); memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
return 0; return 0;
} }
@ -1038,20 +1035,12 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
ASSERT(pTaskInfo->streamInfo.pReq == NULL);
pTaskInfo->streamInfo.pReq = pReq;
pTaskInfo->streamInfo.scanVer = scanVer;
return 0;
}
#endif
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL)); if((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)){
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
return -1;
}
qDebug("set the submit block for future scan"); qDebug("set the submit block for future scan");
pTaskInfo->streamInfo.submit = submit; pTaskInfo->streamInfo.submit = submit;
@ -1061,7 +1050,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.prepareStatus = *pOffset;
pTaskInfo->streamInfo.returned = 0; pTaskInfo->streamInfo.returned = 0;
@ -1074,7 +1062,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// TODO add more check // TODO add more check
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
ASSERT(pOperator->numOfDownstream == 1); if(pOperator->numOfDownstream != 1){
qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream);
return -1;
}
pOperator = pOperator->pDownstream[0]; pOperator = pOperator->pDownstream[0];
} }
@ -1085,6 +1076,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
// let's seek to the next version in wal file // let's seek to the next version in wal file
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1);
return -1; return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
@ -1099,6 +1091,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
uid = pTableInfo->uid; uid = pTableInfo->uid;
ts = INT64_MIN; ts = INT64_MIN;
} else { } else {
qError("uid == 0 and tablelist size is 0");
return -1; return -1;
} }
} }
@ -1122,7 +1115,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} }
// TODO after dropping table, table may not found // TODO after dropping table, table may not found
ASSERT(found); if(!found){
qError("uid not found in tablelist %" PRId64, uid);
return -1;
}
if (pTableScanInfo->base.dataReader == NULL) { if (pTableScanInfo->base.dataReader == NULL) {
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
@ -1131,7 +1127,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 || pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->base.dataReader == NULL) { pTableScanInfo->base.dataReader == NULL) {
ASSERT(0); qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
return -1;
} }
} }
@ -1146,7 +1143,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, numOfTables); ts, pTableScanInfo->currentTable, numOfTables);
} else { } else {
ASSERT(0); qError("invalid pOffset->type:%d", pOffset->type);
return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
@ -1178,7 +1176,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
ASSERT(size == 1);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
@ -1207,7 +1204,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL); if(pMsg->info.ahandle == NULL){
qError("pMsg->info.ahandle is NULL");
return;
}
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

View File

@ -939,7 +939,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY ekey = ascScan ? win.ekey : win.skey; TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
ASSERT(forwardRows > 0);
// prev time window not interpolation yet. // prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {

View File

@ -21,6 +21,7 @@
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "version.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdataformat.h" #include "tdataformat.h"
@ -527,6 +528,7 @@ int32_t udfdConnectToMnode() {
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = taosGetPId(); connReq.pid = taosGetPId();
connReq.startTime = taosGetTimestampMs(); connReq.startTime = taosGetTimestampMs();
strcpy(connReq.sVer, version);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void *pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);

View File

@ -345,6 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
} }
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (errno == E2BIG) { if (errno == E2BIG) {
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes);
buildInvalidOperationMsg(&pBuf, "value too long"); buildInvalidOperationMsg(&pBuf, "value too long");
ret = TSDB_CODE_PAR_VALUE_TOO_LONG; ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end; goto end;

View File

@ -2160,8 +2160,8 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// append to log buffer // append to log buffer
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
terrno = TSDB_CODE_SYN_BUFFER_FULL; ASSERT(terrno != 0);
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, TSDB_CODE_SYN_BUFFER_FULL); (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
return -1; return -1;
} }

View File

@ -48,7 +48,16 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncIndex index = pEntry->index; SyncIndex index = pEntry->index;
if (index - pBuf->startIndex >= pBuf->size) { if (index - pBuf->startIndex >= pBuf->size) {
sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index); terrno = TSDB_CODE_SYN_BUFFER_FULL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
goto _err;
}
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) {
terrno = TSDB_CODE_SYN_WRITE_STALL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
goto _err; goto _err;
} }
@ -475,7 +484,7 @@ _out:
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode) { int32_t applyCode) {
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) { if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) {
return 0; return 0;
} }
@ -587,10 +596,10 @@ _out:
// mark as restored if needed // mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
currentTerm <= pEntry->term) { currentTerm <= pEntry->term) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
pNode->restoreFinish = true; pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
} }
if (!inBuf) { if (!inBuf) {

View File

@ -412,7 +412,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_TABLE_LIMITED, "Table creation limite
// sync // sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync leader is unreachable")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error")
@ -420,9 +420,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for re
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restoring")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq //tq

View File

@ -580,6 +580,16 @@ static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) {
return usage; return usage;
} }
static int32_t taosLRUCacheShardGetElems(SLRUCacheShard *shard) {
int32_t elems = 0;
taosThreadMutexLock(&shard->mutex);
elems = shard->table.elems;
taosThreadMutexUnlock(&shard->mutex);
return elems;
}
static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) { static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
size_t usage = 0; size_t usage = 0;
@ -755,6 +765,16 @@ size_t taosLRUCacheGetUsage(SLRUCache *cache) {
return usage; return usage;
} }
int32_t taosLRUCacheGetElems(SLRUCache *cache) {
int32_t elems = 0;
for (int i = 0; i < cache->numShards; ++i) {
elems += taosLRUCacheShardGetElems(&cache->shards[i]);
}
return elems;
}
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) { size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
size_t usage = 0; size_t usage = 0;

File diff suppressed because it is too large Load Diff

View File

@ -274,6 +274,7 @@ function run_thread() {
# echo "$thread_no ${line} DONE" # echo "$thread_no ${line} DONE"
if [ $ret -eq 0 ]; then if [ $ret -eq 0 ]; then
echo -e "$case_index \e[34m DONE <<<<< \e[0m ${case_info} \e[34m[${total_time}s]\e[0m \e[32m success\e[0m" echo -e "$case_index \e[34m DONE <<<<< \e[0m ${case_info} \e[34m[${total_time}s]\e[0m \e[32m success\e[0m"
flock -x $lock_file -c "echo \"${case_info}|success|${total_time}\" >>${success_case_file}"
else else
if [ ! -z ${web_server} ]; then if [ ! -z ${web_server} ]; then
flock -x $lock_file -c "echo -e \"${hosts[index]} ret:${ret} ${line}\n ${web_server}/$test_log_dir/${case_file}.txt\" >>${failed_case_file}" flock -x $lock_file -c "echo -e \"${hosts[index]} ret:${ret} ${line}\n ${web_server}/$test_log_dir/${case_file}.txt\" >>${failed_case_file}"
@ -365,6 +366,8 @@ lock_file=$log_dir/$$.lock
index_file=$log_dir/case_index.txt index_file=$log_dir/case_index.txt
stat_file=$log_dir/stat.txt stat_file=$log_dir/stat.txt
failed_case_file=$log_dir/failed.txt failed_case_file=$log_dir/failed.txt
success_case_file=$log_dir/success.txt
echo "0" >$index_file echo "0" >$index_file
i=0 i=0

View File

@ -22,7 +22,7 @@ class TDTestCase:
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)") tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns") tdSql.query("select count(*) from information_schema.ins_columns")
tdSql.checkData(0, 0, 271) tdSql.checkData(0, 0, 272)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'") tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14) tdSql.checkRows(14)