Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-13 18:07:38 +08:00
parent 1786f7077d
commit 4104840cea
5 changed files with 43 additions and 23 deletions

View File

@ -467,7 +467,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
// data.infileFD = NULL;
// data.noStatus = noStatus;
uError("ERROR: %s stat file %s: ", __func__, file);
// uError("ERROR: %s stat file %s: ", __func__, file);
if (taosStatFile(file, &contentLength, NULL, NULL) < 0) {
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);

View File

@ -247,7 +247,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 30;
int32_t tsStreamCheckpointInterval = 10;
float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 15;
int32_t tsTtlUnit = 86400;
@ -721,7 +721,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 20, 1200, CFG_SCOPE_SERVER,
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 1, 1200, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;

View File

@ -77,6 +77,7 @@ typedef struct {
SArray* chkpInUse;
int32_t chkpCap;
TdThreadRwlock chkpDirLock;
int64_t dataWritten;
} STaskDbWrapper;

View File

@ -891,6 +891,7 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
}
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
pTaskDb->dataWritten = 0;
pTaskDb->chkpId = chkpId;
@ -2161,7 +2162,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
wrapper->dataWritten += 1; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
@ -2237,7 +2239,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
wrapper->dataWritten += 1; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
@ -2277,6 +2280,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
stDebug("streamStateClear_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
@ -3253,6 +3257,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) {
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
int i = streamStateGetCfIdx(pState, cfKeyName);
if (i < 0) {
@ -3285,7 +3290,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
@ -3303,6 +3310,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
wrapper->dataWritten += 1;
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
stError("streamState failed to write batch, err:%s", err);
@ -3401,6 +3409,26 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
return code;
}
void hashTableToDebug(SHashObj* pTbl) {
size_t sz = taosHashGetSize(pTbl);
int32_t total = 0;
char* buf = taosMemoryCalloc(1, sz * 16);
void* pIter = taosHashIterate(pTbl, NULL);
while (pIter) {
size_t len = 0;
char* name = taosHashGetKey(pIter, &len);
char* tname = taosMemoryCalloc(1, len + 1);
memcpy(tname, name, len);
total += sprintf(buf + total, "%s,", tname);
pIter = taosHashIterate(pTbl, pIter);
taosMemoryFree(tname);
}
buf[total - 1] = 0;
stTrace("curr file list:[%s]", buf);
taosMemoryFree(buf);
}
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
taosThreadRwlockWrlock(&p->rwLock);
@ -3420,6 +3448,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
taosHashClear(p->pSstTbl[1 - p->idx]);
TdDirPtr pDir = taosOpenDir(p->buf);
TdDirEntryPtr de = NULL;
@ -3445,17 +3474,13 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
continue;
}
}
taosCloseDir(&pDir);
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
size_t len = 0;
char* name = taosHashGetKey(pIter, &len);
stTrace("chkp get file list: 1-1");
hashTableToDebug(p->pSstTbl[1 - p->idx]);
char* buf = taosMemoryCalloc(1, len + 1);
strncpy(buf, name, len);
stError("curr file list: %s", buf);
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
stTrace("chkp get file list: 1-2");
hashTableToDebug(p->pSstTbl[p->idx]);
if (p->init == 0) {
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
@ -3482,7 +3507,6 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
taosArrayClearP(p->pDel, taosMemoryFree);
taosHashClear(p->pSstTbl[1 - p->idx]);
p->update = 0;
taosCloseDir(&pDir);
return code;
}
@ -3493,11 +3517,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
}
taosHashClear(p->pSstTbl[p->idx]);
p->idx = 1 - p->idx;
taosCloseDir(&pDir);
taosThreadRwlockUnlock(&p->rwLock);
return 0;
@ -3664,7 +3685,7 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
SDbChkp* pChkp = NULL;
SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL;
if (pChkp == NULL) {
char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64);
@ -3679,8 +3700,6 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
code = dbChkpDumpTo(pChkp, dname);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
} else {
pChkp = *ppChkp;
}
code = dbChkpGetDelta(pChkp, chkpId, list);

View File

@ -457,7 +457,7 @@ static int uploadCheckpointToS3(char* id, char* path) {
return -1;
}
stDebug("[s3] upload checkpoint:%s", filename);
break;
// break;
}
taosCloseDir(&pDir);