Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
442f0db103
|
@ -21,13 +21,11 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "executorimpl.h"
|
#include "thash.h"
|
||||||
|
|
||||||
#define DS_CAPACITY_ENOUGH 1
|
#define DS_BUF_LOW 1
|
||||||
#define DS_CAPACITY_FULL 2
|
#define DS_BUF_FULL 2
|
||||||
#define DS_NEED_SCHEDULE 3
|
#define DS_BUF_EMPTY 3
|
||||||
#define DS_END 4
|
|
||||||
#define DS_IN_PROCESS 5
|
|
||||||
|
|
||||||
struct SDataSink;
|
struct SDataSink;
|
||||||
struct SSDataBlock;
|
struct SSDataBlock;
|
||||||
|
@ -42,15 +40,20 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
|
||||||
typedef void* DataSinkHandle;
|
typedef void* DataSinkHandle;
|
||||||
|
|
||||||
typedef struct SInputData {
|
typedef struct SInputData {
|
||||||
const SSDataBlock* pData;
|
const struct SSDataBlock* pData;
|
||||||
SHashObj* pTableRetrieveTsMap;
|
SHashObj* pTableRetrieveTsMap;
|
||||||
} SInputData;
|
} SInputData;
|
||||||
|
|
||||||
typedef struct SOutPutData {
|
typedef struct SOutputData {
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
int8_t compressed;
|
int8_t compressed;
|
||||||
char* pData;
|
char* pData;
|
||||||
} SOutPutData;
|
bool queryEnd;
|
||||||
|
bool needSchedule;
|
||||||
|
int32_t bufStatus;
|
||||||
|
int64_t useconds;
|
||||||
|
int8_t precision;
|
||||||
|
} SOutputData;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a subplan's datasinker handle for all later operations.
|
* Create a subplan's datasinker handle for all later operations.
|
||||||
|
@ -66,16 +69,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
|
||||||
* @param pRes
|
* @param pRes
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
|
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
|
||||||
|
|
||||||
void dsEndPut(DataSinkHandle handle);
|
void dsEndPut(DataSinkHandle handle, int64_t useconds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the length of the data returned by the next call to dsGetDataBlock.
|
* Get the length of the data returned by the next call to dsGetDataBlock.
|
||||||
* @param handle
|
* @param handle
|
||||||
* @return data length
|
* @param pLen data length
|
||||||
*/
|
*/
|
||||||
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
|
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get data, the caller needs to allocate data memory.
|
* Get data, the caller needs to allocate data memory.
|
||||||
|
@ -84,7 +87,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
|
||||||
* @param pStatus output
|
* @param pStatus output
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus);
|
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
|
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
|
|
@ -31,10 +31,10 @@ typedef struct SDataSinkManager {
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SDataSinkManager;
|
} SDataSinkManager;
|
||||||
|
|
||||||
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus);
|
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
||||||
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle);
|
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds);
|
||||||
typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus);
|
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
|
||||||
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus);
|
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
||||||
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
||||||
|
|
||||||
typedef struct SDataSinkHandle {
|
typedef struct SDataSinkHandle {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "tcompression.h"
|
#include "tcompression.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
|
#include "executorimpl.h"
|
||||||
|
|
||||||
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
|
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
|
||||||
|
|
||||||
|
@ -42,6 +43,8 @@ typedef struct SDataDispatchHandle {
|
||||||
STaosQueue* pDataBlocks;
|
STaosQueue* pDataBlocks;
|
||||||
SDataDispatchBuf nextOutput;
|
SDataDispatchBuf nextOutput;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
|
bool queryEnd;
|
||||||
|
int64_t useconds;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SDataDispatchHandle;
|
} SDataDispatchHandle;
|
||||||
|
|
||||||
|
@ -124,7 +127,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
||||||
|
|
||||||
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
pthread_mutex_lock(&pDispatcher->mutex);
|
pthread_mutex_lock(&pDispatcher->mutex);
|
||||||
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL;
|
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
|
||||||
|
int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
|
||||||
|
(blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
||||||
pDispatcher->status = status;
|
pDispatcher->status = status;
|
||||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||||
return status;
|
return status;
|
||||||
|
@ -137,7 +142,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) {
|
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
|
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
|
||||||
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
||||||
|
@ -145,38 +150,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
|
||||||
}
|
}
|
||||||
toDataCacheEntry(pDispatcher, pInput, pBuf);
|
toDataCacheEntry(pDispatcher, pInput, pBuf);
|
||||||
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
|
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
|
||||||
*pStatus = updateStatus(pDispatcher);
|
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void endPut(struct SDataSinkHandle* pHandle) {
|
static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
pthread_mutex_lock(&pDispatcher->mutex);
|
pthread_mutex_lock(&pDispatcher->mutex);
|
||||||
pDispatcher->status = DS_END;
|
pDispatcher->queryEnd = true;
|
||||||
|
pDispatcher->useconds = useconds;
|
||||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
|
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||||
*pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS;
|
*pQueryEnd = pDispatcher->queryEnd;
|
||||||
return 0;
|
*pLen = 0;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
SDataDispatchBuf* pBuf = NULL;
|
SDataDispatchBuf* pBuf = NULL;
|
||||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||||
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||||
taosFreeQitem(pBuf);
|
taosFreeQitem(pBuf);
|
||||||
return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
|
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) {
|
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
||||||
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||||
pOutput->numOfRows = pEntry->numOfRows;
|
pOutput->numOfRows = pEntry->numOfRows;
|
||||||
pOutput->compressed = pEntry->compressed;
|
pOutput->compressed = pEntry->compressed;
|
||||||
tfree(pDispatcher->nextOutput.pData); // todo persistent
|
tfree(pDispatcher->nextOutput.pData); // todo persistent
|
||||||
*pStatus = updateStatus(pDispatcher);
|
pOutput->bufStatus = updateStatus(pDispatcher);
|
||||||
|
pthread_mutex_lock(&pDispatcher->mutex);
|
||||||
|
pOutput->queryEnd = pDispatcher->queryEnd;
|
||||||
|
pOutput->needSchedule = false;
|
||||||
|
pOutput->useconds = pDispatcher->useconds;
|
||||||
|
pOutput->precision = pDispatcher->schema.precision;
|
||||||
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +218,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS
|
||||||
dispatcher->sink.fDestroy = destroyDataSinker;
|
dispatcher->sink.fDestroy = destroyDataSinker;
|
||||||
dispatcher->pManager = pManager;
|
dispatcher->pManager = pManager;
|
||||||
dispatcher->schema = pDataSink->schema;
|
dispatcher->schema = pDataSink->schema;
|
||||||
dispatcher->status = DS_CAPACITY_ENOUGH;
|
dispatcher->status = DS_BUF_EMPTY;
|
||||||
|
dispatcher->queryEnd = false;
|
||||||
dispatcher->pDataBlocks = taosOpenQueue();
|
dispatcher->pDataBlocks = taosOpenQueue();
|
||||||
pthread_mutex_init(&dispatcher->mutex, NULL);
|
pthread_mutex_init(&dispatcher->mutex, NULL);
|
||||||
if (NULL == dispatcher->pDataBlocks) {
|
if (NULL == dispatcher->pDataBlocks) {
|
||||||
|
|
|
@ -31,24 +31,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) {
|
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
|
||||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
return pHandleImpl->fPut(pHandleImpl, pInput, pStatus);
|
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dsEndPut(DataSinkHandle handle) {
|
void dsEndPut(DataSinkHandle handle, int64_t useconds) {
|
||||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
return pHandleImpl->fEndPut(pHandleImpl);
|
return pHandleImpl->fEndPut(pHandleImpl, useconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) {
|
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
|
||||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
return pHandleImpl->fGetLen(pHandleImpl, pStatus);
|
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) {
|
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
|
||||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus);
|
return pHandleImpl->fGetData(pHandleImpl, pOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dsScheduleProcess(void* ahandle, void* pItem) {
|
void dsScheduleProcess(void* ahandle, void* pItem) {
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
@ -33,6 +32,24 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||||
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
|
||||||
|
char* limit;
|
||||||
|
|
||||||
|
if (nlen == 0 || hlen < nlen) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
limit = haystack + hlen - nlen + 1;
|
||||||
|
while ((haystack = (char*)memchr(
|
||||||
|
haystack, needle[0], limit - haystack)) != NULL) {
|
||||||
|
if (memcmp(haystack, needle, nlen) == 0) {
|
||||||
|
return haystack;
|
||||||
|
}
|
||||||
|
haystack++;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static inline int64_t walScanLogGetLastVer(SWal* pWal) {
|
static inline int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
ASSERT(pWal->fileInfoSet != NULL);
|
ASSERT(pWal->fileInfoSet != NULL);
|
||||||
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
@ -47,7 +64,8 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
|
|
||||||
struct stat statbuf;
|
struct stat statbuf;
|
||||||
stat(fnameStr, &statbuf);
|
stat(fnameStr, &statbuf);
|
||||||
int readSize = MIN(WAL_MAX_SIZE, statbuf.st_size);
|
int readSize = MIN(WAL_MAX_SIZE + 2, statbuf.st_size);
|
||||||
|
pLastFileInfo->fileSize = statbuf.st_size;
|
||||||
|
|
||||||
FileFd fd = taosOpenFileRead(fnameStr);
|
FileFd fd = taosOpenFileRead(fnameStr);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
|
@ -64,6 +82,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosLSeekFile(fd, -readSize, SEEK_END);
|
||||||
if (readSize != taosReadFile(fd, buf, readSize)) {
|
if (readSize != taosReadFile(fd, buf, readSize)) {
|
||||||
free(buf);
|
free(buf);
|
||||||
taosCloseFile(fd);
|
taosCloseFile(fd);
|
||||||
|
@ -71,21 +90,25 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* found = strstr(buf, (const char*)&magic);
|
char* haystack = buf;
|
||||||
if (found == NULL) {
|
char* found = NULL;
|
||||||
ASSERT(false);
|
char *candidate;
|
||||||
// file has to be deleted
|
while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
|
||||||
free(buf);
|
|
||||||
taosCloseFile(fd);
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
char *another;
|
|
||||||
while((another = strstr(found + 1, (const char*)&magic)) != NULL) {
|
|
||||||
// read and validate
|
// read and validate
|
||||||
SWalHead *logContent = (SWalHead*)another;
|
SWalHead *logContent = (SWalHead*)candidate;
|
||||||
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
|
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
|
||||||
found = another;
|
found = candidate;
|
||||||
|
}
|
||||||
|
haystack = candidate + 1;
|
||||||
|
}
|
||||||
|
if (found == buf) {
|
||||||
|
SWalHead *logContent = (SWalHead*)found;
|
||||||
|
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
|
||||||
|
// file has to be deleted
|
||||||
|
free(buf);
|
||||||
|
taosCloseFile(fd);
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosCloseFile(fd);
|
taosCloseFile(fd);
|
||||||
|
@ -120,7 +143,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
SWalFileInfo fileInfo;
|
SWalFileInfo fileInfo;
|
||||||
memset(&fileInfo, -1, sizeof(SWalFileInfo));
|
memset(&fileInfo, -1, sizeof(SWalFileInfo));
|
||||||
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
|
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
|
||||||
FileFd fd = taosOpenFileRead(ent->d_name);
|
|
||||||
//get lastVer
|
//get lastVer
|
||||||
//get size
|
//get size
|
||||||
taosArrayPush(pLogInfoArray, &fileInfo);
|
taosArrayPush(pLogInfoArray, &fileInfo);
|
||||||
|
@ -137,28 +159,25 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
}
|
}
|
||||||
int newSz = taosArrayGetSize(pLogInfoArray);
|
int newSz = taosArrayGetSize(pLogInfoArray);
|
||||||
// case 1. meta file not exist / cannot be parsed
|
// case 1. meta file not exist / cannot be parsed
|
||||||
if (pWal->fileInfoSet == NULL && newSz != 0) {
|
if (oldSz < newSz) {
|
||||||
// recover fileInfo set
|
|
||||||
pWal->fileInfoSet = pLogInfoArray;
|
|
||||||
if (newSz != 0) {
|
|
||||||
// recover meta version
|
|
||||||
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
|
|
||||||
pWal->writeCur = newSz - 1;
|
|
||||||
}
|
|
||||||
// recover file size
|
|
||||||
} else if (oldSz < newSz) {
|
|
||||||
for (int i = oldSz; i < newSz; i++) {
|
for (int i = oldSz; i < newSz; i++) {
|
||||||
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
|
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
|
||||||
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->writeCur = newSz - 1;
|
pWal->writeCur = newSz - 1;
|
||||||
|
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
|
||||||
|
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
|
||||||
|
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
|
||||||
|
ASSERT(pWal->vers.lastVer != -1);
|
||||||
|
|
||||||
|
int code = walSaveMeta(pWal);
|
||||||
|
if (code < 0) {
|
||||||
|
taosArrayDestroy(pLogInfoArray);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal->fileInfoSet && taosArrayGetSize(pWal->fileInfoSet) != 0) {
|
|
||||||
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
|
|
||||||
ASSERT(pWal->vers.lastVer != -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
// case 2. versions in meta not match log
|
// case 2. versions in meta not match log
|
||||||
// or some log not included in meta
|
// or some log not included in meta
|
||||||
// (e.g. program killed)
|
// (e.g. program killed)
|
||||||
|
@ -182,14 +201,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int code = walSaveMeta(pWal);
|
|
||||||
if (code < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// get last version of this file
|
// get last version of this file
|
||||||
//
|
//
|
||||||
// rebuild meta
|
// rebuild meta
|
||||||
|
taosArrayDestroy(pLogInfoArray);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,6 +413,10 @@ int walLoadMeta(SWal* pWal) {
|
||||||
}
|
}
|
||||||
memset(buf, 0, size + 5);
|
memset(buf, 0, size + 5);
|
||||||
FileFd fd = taosOpenFileRead(fnameStr);
|
FileFd fd = taosOpenFileRead(fnameStr);
|
||||||
|
if (fd < 0) {
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if (taosReadFile(fd, buf, size) != size) {
|
if (taosReadFile(fd, buf, size) != size) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosCloseFile(fd);
|
taosCloseFile(fd);
|
||||||
|
|
|
@ -106,6 +106,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
// init write buffer
|
// init write buffer
|
||||||
memset(&pWal->writeHead, 0, sizeof(SWalHead));
|
memset(&pWal->writeHead, 0, sizeof(SWalHead));
|
||||||
pWal->writeHead.head.headVer = WAL_HEAD_VER;
|
pWal->writeHead.head.headVer = WAL_HEAD_VER;
|
||||||
|
pWal->writeHead.magic = WAL_MAGIC;
|
||||||
|
|
||||||
if (pthread_mutex_init(&pWal->mutex, NULL) < 0) {
|
if (pthread_mutex_init(&pWal->mutex, NULL) < 0) {
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
|
@ -121,7 +122,9 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) {
|
walLoadMeta(pWal);
|
||||||
|
|
||||||
|
if (walCheckAndRepairMeta(pWal) < 0) {
|
||||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
pthread_mutex_destroy(&pWal->mutex);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
|
@ -130,6 +133,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walCheckAndRepairIdx(pWal) < 0) {
|
if (walCheckAndRepairIdx(pWal) < 0) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
|
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
|
||||||
|
|
|
@ -340,7 +340,10 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
||||||
char buf[100];
|
char buf[100];
|
||||||
sprintf(buf, "%s/meta-ver%d", pathName, 0);
|
sprintf(buf, "%s/meta-ver%d", pathName, 0);
|
||||||
remove(buf);
|
remove(buf);
|
||||||
|
sprintf(buf, "%s/meta-ver%d", pathName, 1);
|
||||||
|
remove(buf);
|
||||||
SetUp();
|
SetUp();
|
||||||
|
//getchar();
|
||||||
|
|
||||||
ASSERT_EQ(pWal->vers.lastVer, 99);
|
ASSERT_EQ(pWal->vers.lastVer, 99);
|
||||||
|
|
||||||
|
@ -377,4 +380,26 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
int ver = rand() % 200;
|
||||||
|
code = walReadWithHandle(pRead, ver);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
// printf("rrbody: \n");
|
||||||
|
// for(int i = 0; i < pRead->pHead->head.len; i++) {
|
||||||
|
// printf("%d ", pRead->pHead->head.body[i]);
|
||||||
|
//}
|
||||||
|
// printf("\n");
|
||||||
|
|
||||||
|
ASSERT_EQ(pRead->pHead->head.version, ver);
|
||||||
|
ASSERT_EQ(pRead->curVersion, ver + 1);
|
||||||
|
char newStr[100];
|
||||||
|
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||||
|
int len = strlen(newStr);
|
||||||
|
ASSERT_EQ(pRead->pHead->head.len, len);
|
||||||
|
for (int j = 0; j < len; j++) {
|
||||||
|
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,7 +85,6 @@ if $data02 != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
return
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
@ -104,4 +103,4 @@ if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue