Merge branch 'main' into fix/TS-2625
This commit is contained in:
commit
4959e8fa7c
|
@ -525,12 +525,13 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
while ((pNode = tdListNext(&iter)) != NULL) {
|
||||||
uint64_t* digest = (uint64_t*)pNode->data;
|
uint64_t* digest = (uint64_t*)pNode->data;
|
||||||
if (digest[0] == p[2] && digest[1] == p[3]) {
|
if (digest[0] == p[2] && digest[1] == p[3]) {
|
||||||
tdListPopNode(&((*pEntry)->list), pNode);
|
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
|
||||||
|
taosMemoryFree(tmp);
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
|
metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
|
||||||
(et - st) / 1000.0);
|
(et - st) / 1000.0);
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -816,7 +816,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
// TODO version should be assigned and refed during preprocess
|
// TODO version should be assigned and refed during preprocess
|
||||||
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int64_t ver = pRef->refVer;
|
int64_t ver = pRef->refVer;
|
||||||
|
@ -837,12 +836,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
|
|
||||||
pHandle->execHandle.task =
|
pHandle->execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
|
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
|
||||||
ASSERT(pHandle->execHandle.task);
|
|
||||||
void* scanner = NULL;
|
void* scanner = NULL;
|
||||||
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
||||||
ASSERT(scanner);
|
|
||||||
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
ASSERT(pHandle->execHandle.pExecReader);
|
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
|
@ -875,8 +871,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
// TODO
|
return -1;
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
||||||
|
@ -886,8 +881,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
// TODO
|
return -1;
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
// close handle
|
// close handle
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
|
|
||||||
int32_t tqMetaOpen(STQ* pTq) {
|
int32_t tqMetaOpen(STQ* pTq) {
|
||||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
|
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
|
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
|
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,40 +194,49 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t vlen;
|
int32_t vlen;
|
||||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
|
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
|
||||||
pHandle->consumerId, TD_VID(pTq->pVnode));
|
pHandle->consumerId, TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
ASSERT(0);
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, buf, vlen);
|
tEncoderInit(&encoder, buf, vlen);
|
||||||
|
|
||||||
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TXN* txn;
|
TXN* txn;
|
||||||
|
|
||||||
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
0) {
|
0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
|
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
|
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
|
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
|
@ -1086,13 +1086,12 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
|
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
|
||||||
char* pPayload = taosMemoryMalloc(size);
|
char* pPayload = taosMemoryMalloc(size);
|
||||||
|
|
||||||
if (numOfTables > 0) {
|
|
||||||
*(int32_t*)pPayload = numOfTables;
|
*(int32_t*)pPayload = numOfTables;
|
||||||
|
if (numOfTables > 0) {
|
||||||
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
|
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
|
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
|
||||||
taosMemoryFree(pPayload);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,8 +87,6 @@ int32_t walApplyVer(SWal *pWal, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||||
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
|
|
||||||
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
|
|
||||||
if (ver < pWal->vers.commitVer) {
|
if (ver < pWal->vers.commitVer) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -138,25 +136,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||||
|
|
||||||
if (pIdxFile == NULL) {
|
if (pIdxFile == NULL) {
|
||||||
ASSERT(0);
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||||
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
|
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// read idx file and get log file pos
|
// read idx file and get log file pos
|
||||||
SWalIdxEntry entry;
|
SWalIdxEntry entry;
|
||||||
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||||
ASSERT(0);
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(entry.ver == ver);
|
|
||||||
|
|
||||||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||||
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||||
|
@ -176,24 +170,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
}
|
}
|
||||||
// validate offset
|
// validate offset
|
||||||
SWalCkHead head;
|
SWalCkHead head;
|
||||||
ASSERT(taosValidFile(pLogFile));
|
|
||||||
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
|
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
|
||||||
if (size != sizeof(SWalCkHead)) {
|
if (size != sizeof(SWalCkHead)) {
|
||||||
ASSERT(0);
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
code = walValidHeadCksum(&head);
|
code = walValidHeadCksum(&head);
|
||||||
|
|
||||||
ASSERT(code == 0);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
ASSERT(0);
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (head.head.version != ver) {
|
if (head.head.version != ver) {
|
||||||
ASSERT(0);
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -202,22 +191,17 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
// truncate old files
|
// truncate old files
|
||||||
code = taosFtruncateFile(pLogFile, entry.offset);
|
code = taosFtruncateFile(pLogFile, entry.offset);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
code = taosFtruncateFile(pIdxFile, idxOff);
|
code = taosFtruncateFile(pIdxFile, idxOff);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pWal->vers.lastVer = ver - 1;
|
pWal->vers.lastVer = ver - 1;
|
||||||
if (pWal->vers.lastVer < pWal->vers.firstVer) {
|
|
||||||
ASSERT(pWal->vers.lastVer == pWal->vers.firstVer - 1);
|
|
||||||
}
|
|
||||||
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
|
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
|
||||||
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
|
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
|
||||||
taosCloseFile(&pIdxFile);
|
taosCloseFile(&pIdxFile);
|
||||||
|
@ -386,7 +370,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||||
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
|
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
|
||||||
ASSERT(0);
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayClear(pWal->toDeleteFiles);
|
taosArrayClear(pWal->toDeleteFiles);
|
||||||
|
@ -441,7 +425,6 @@ int32_t walRollImpl(SWal *pWal) {
|
||||||
pWal->pIdxFile = pIdxFile;
|
pWal->pIdxFile = pIdxFile;
|
||||||
pWal->pLogFile = pLogFile;
|
pWal->pLogFile = pLogFile;
|
||||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||||
ASSERT(pWal->writeCur >= 0);
|
|
||||||
|
|
||||||
pWal->lastRollSeq = walGetSeq();
|
pWal->lastRollSeq = walGetSeq();
|
||||||
|
|
||||||
|
@ -458,8 +441,6 @@ END:
|
||||||
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||||
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
||||||
ASSERT(pFileInfo != NULL);
|
|
||||||
ASSERT(pFileInfo->firstVer >= 0);
|
|
||||||
int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
||||||
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
||||||
idxOffset);
|
idxOffset);
|
||||||
|
@ -476,7 +457,6 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
if (endOffset < 0) {
|
if (endOffset < 0) {
|
||||||
wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver);
|
wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver);
|
||||||
}
|
}
|
||||||
ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -486,9 +466,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
|
|
||||||
int64_t offset = walGetCurFileOffset(pWal);
|
int64_t offset = walGetCurFileOffset(pWal);
|
||||||
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
||||||
ASSERT(pFileInfo != NULL);
|
|
||||||
|
|
||||||
ASSERT(pFileInfo->firstVer != -1);
|
|
||||||
pWal->writeHead.head.version = index;
|
pWal->writeHead.head.version = index;
|
||||||
pWal->writeHead.head.bodyLen = bodyLen;
|
pWal->writeHead.head.bodyLen = bodyLen;
|
||||||
pWal->writeHead.head.msgType = msgType;
|
pWal->writeHead.head.msgType = msgType;
|
||||||
|
@ -525,7 +503,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
if (pWal->vers.firstVer == -1) {
|
if (pWal->vers.firstVer == -1) {
|
||||||
ASSERT(index == 0);
|
|
||||||
pWal->vers.firstVer = 0;
|
pWal->vers.firstVer = 0;
|
||||||
}
|
}
|
||||||
pWal->vers.lastVer = index;
|
pWal->vers.lastVer = index;
|
||||||
|
@ -541,7 +518,6 @@ END:
|
||||||
wFatal("vgId:%d, failed to ftruncate logfile to offset:%" PRId64 " during recovery due to %s", pWal->cfg.vgId,
|
wFatal("vgId:%d, failed to ftruncate logfile to offset:%" PRId64 " during recovery due to %s", pWal->cfg.vgId,
|
||||||
offset, strerror(errno));
|
offset, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
ASSERT(0 && "failed to recover from error");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
||||||
|
@ -549,7 +525,6 @@ END:
|
||||||
wFatal("vgId:%d, failed to ftruncate idxfile to offset:%" PRId64 "during recovery due to %s", pWal->cfg.vgId,
|
wFatal("vgId:%d, failed to ftruncate idxfile to offset:%" PRId64 "during recovery due to %s", pWal->cfg.vgId,
|
||||||
idxOffset, strerror(errno));
|
idxOffset, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
ASSERT(0 && "failed to recover from error");
|
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -576,8 +551,6 @@ int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pWal->pLogFile != NULL && pWal->pIdxFile != NULL && pWal->writeCur >= 0);
|
|
||||||
|
|
||||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -614,8 +587,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
|
|
||||||
|
|
||||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -28,7 +28,7 @@ void pressOtherKey(char c);
|
||||||
bool shellAutoInit();
|
bool shellAutoInit();
|
||||||
|
|
||||||
// set conn
|
// set conn
|
||||||
void shellSetConn(TAOS* conn);
|
void shellSetConn(TAOS* conn, bool runOnce);
|
||||||
|
|
||||||
// exit shell auto funciton, shell exit call once
|
// exit shell auto funciton, shell exit call once
|
||||||
void shellAutoExit();
|
void shellAutoExit();
|
||||||
|
|
|
@ -332,6 +332,7 @@ bool varMode = false; // enter var names list mode
|
||||||
|
|
||||||
TAOS* varCon = NULL;
|
TAOS* varCon = NULL;
|
||||||
SShellCmd* varCmd = NULL;
|
SShellCmd* varCmd = NULL;
|
||||||
|
bool varRunOnce = false;
|
||||||
SMatch* lastMatch = NULL; // save last match result
|
SMatch* lastMatch = NULL; // save last match result
|
||||||
int cntDel = 0; // delete byte count after next press tab
|
int cntDel = 0; // delete byte count after next press tab
|
||||||
|
|
||||||
|
@ -375,7 +376,7 @@ void showHelp() {
|
||||||
----- C ----- \n\
|
----- C ----- \n\
|
||||||
create table <tb_name> using <stb_name> tags ...\n\
|
create table <tb_name> using <stb_name> tags ...\n\
|
||||||
create database <db_name> <db_options> ...\n\
|
create database <db_name> <db_options> ...\n\
|
||||||
create dnode \"fqdn:port\"n\
|
create dnode \"fqdn:port\" ...\n\
|
||||||
create index ...\n\
|
create index ...\n\
|
||||||
create mnode on dnode <dnode_id> ;\n\
|
create mnode on dnode <dnode_id> ;\n\
|
||||||
create qnode on dnode <dnode_id> ;\n\
|
create qnode on dnode <dnode_id> ;\n\
|
||||||
|
@ -637,10 +638,11 @@ bool shellAutoInit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set conn
|
// set conn
|
||||||
void shellSetConn(TAOS* conn) {
|
void shellSetConn(TAOS* conn, bool runOnce) {
|
||||||
varCon = conn;
|
varCon = conn;
|
||||||
|
varRunOnce = runOnce;
|
||||||
// init database and stable
|
// init database and stable
|
||||||
updateTireValue(WT_VAR_DBNAME, false);
|
if (!runOnce) updateTireValue(WT_VAR_DBNAME, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// exit shell auto funciton, shell exit call once
|
// exit shell auto funciton, shell exit call once
|
||||||
|
@ -784,6 +786,12 @@ int writeVarNames(int type, TAOS_RES* tres) {
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setThreadNull(int type) {
|
||||||
|
taosThreadMutexLock(&tiresMutex);
|
||||||
|
threads[type] = NULL;
|
||||||
|
taosThreadMutexUnlock(&tiresMutex);
|
||||||
|
}
|
||||||
|
|
||||||
bool firstMatchCommand(TAOS* con, SShellCmd* cmd);
|
bool firstMatchCommand(TAOS* con, SShellCmd* cmd);
|
||||||
//
|
//
|
||||||
// thread obtain var thread from db server
|
// thread obtain var thread from db server
|
||||||
|
@ -799,6 +807,7 @@ void* varObtainThread(void* param) {
|
||||||
TAOS_RES* pSql = taos_query(varCon, varSqls[type]);
|
TAOS_RES* pSql = taos_query(varCon, varSqls[type]);
|
||||||
if (taos_errno(pSql)) {
|
if (taos_errno(pSql)) {
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
|
setThreadNull(type);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -814,6 +823,7 @@ void* varObtainThread(void* param) {
|
||||||
firstMatchCommand(varCon, varCmd);
|
firstMatchCommand(varCon, varCmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setThreadNull(type);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1725,10 +1735,12 @@ _return:
|
||||||
|
|
||||||
// main key press tab
|
// main key press tab
|
||||||
void pressTabKey(SShellCmd* cmd) {
|
void pressTabKey(SShellCmd* cmd) {
|
||||||
// check
|
// check empty tab key
|
||||||
if (cmd->commandSize == 0) {
|
if (cmd->commandSize == 0) {
|
||||||
// empty
|
// have multi line tab key
|
||||||
|
if(cmd->bufferSize == 0) {
|
||||||
showHelp();
|
showHelp();
|
||||||
|
}
|
||||||
shellShowOnScreen(cmd);
|
shellShowOnScreen(cmd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1977,7 +1989,7 @@ void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb) {
|
||||||
|
|
||||||
if (dealUseDB(sql)) {
|
if (dealUseDB(sql)) {
|
||||||
// change to new db
|
// change to new db
|
||||||
updateTireValue(WT_VAR_STABLE, false);
|
if (!varRunOnce) updateTireValue(WT_VAR_STABLE, false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1097,10 +1097,11 @@ int32_t shellExecute() {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
shellSetConn(shell.conn);
|
bool runOnce = pArgs->commands != NULL || pArgs->file[0] != 0;
|
||||||
|
shellSetConn(shell.conn, runOnce);
|
||||||
shellReadHistory();
|
shellReadHistory();
|
||||||
|
|
||||||
if (pArgs->commands != NULL || pArgs->file[0] != 0) {
|
if (runOnce) {
|
||||||
if (pArgs->commands != NULL) {
|
if (pArgs->commands != NULL) {
|
||||||
printf("%s%s\r\n", shell.info.promptHeader, pArgs->commands);
|
printf("%s%s\r\n", shell.info.promptHeader, pArgs->commands);
|
||||||
char *cmd = strdup(pArgs->commands);
|
char *cmd = strdup(pArgs->commands);
|
||||||
|
@ -1160,5 +1161,8 @@ int32_t shellExecute() {
|
||||||
taosThreadJoin(spid, NULL);
|
taosThreadJoin(spid, NULL);
|
||||||
|
|
||||||
shellCleanupHistory();
|
shellCleanupHistory();
|
||||||
|
taos_kill_query(shell.conn);
|
||||||
|
taos_close(shell.conn);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue