diff --git a/engine/cmd/database-lab/main.go b/engine/cmd/database-lab/main.go index 26bf2d69d05b004c393014f852cd3d7e40c76f6b..ce4c928402d12aeb6beedaae33d5380179e4270b 100644 --- a/engine/cmd/database-lab/main.go +++ b/engine/cmd/database-lab/main.go @@ -113,7 +113,11 @@ func main() { } // Create a new retrieval service to prepare a data directory and start snapshotting. - retrievalSvc := retrieval.New(cfg, engProps, docker, pm, tm, runner) + retrievalSvc, err := retrieval.New(cfg, engProps, docker, pm, tm, runner) + if err != nil { + log.Errf(errors.WithMessage(err, `error in the "retrieval" section of the config`).Error()) + return + } // Create a cloning service to provision new clones. provisioner, err := provision.New(ctx, &cfg.Provision, dbCfg, docker, pm, engProps.InstanceID, internalNetworkID) @@ -149,7 +153,7 @@ func main() { EngineVersion: version.GetVersion(), DBVersion: provisioner.DetectDBVersion(), Pools: pm.CollectPoolStat(), - Restore: retrievalSvc.CollectRestoreTelemetry(), + Restore: retrievalSvc.ReportState(), }) embeddedUI := embeddedui.New(cfg.EmbeddedUI, engProps, runner, docker) @@ -234,7 +238,8 @@ func reloadConfig(ctx context.Context, provisionSvc *provision.Provisioner, tm * return err } - if err := retrieval.IsValidConfig(cfg); err != nil { + newRetrievalConfig, err := retrieval.ValidateConfig(&cfg.Retrieval) + if err != nil { return err } @@ -258,7 +263,7 @@ func reloadConfig(ctx context.Context, provisionSvc *provision.Provisioner, tm * provisionSvc.Reload(cfg.Provision, dbCfg) tm.Reload(cfg.Global) - retrievalSvc.Reload(ctx, cfg) + retrievalSvc.Reload(ctx, newRetrievalConfig) cloningSvc.Reload(cfg.Cloning) platformSvc.Reload(newPlatformSvc) est.Reload(cfg.Estimator) diff --git a/engine/internal/provision/pool/pool_manager.go b/engine/internal/provision/pool/pool_manager.go index e58c0c8b9b2cbfe9dd54175cfcc35e5e2bfc5d6b..fb56f80e38985962f8ba5cb7eecb2957e59922cb 100644 --- a/engine/internal/provision/pool/pool_manager.go +++ b/engine/internal/provision/pool/pool_manager.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "path" + "path/filepath" "sync" "time" @@ -346,6 +347,11 @@ func (pm *Manager) examineEntries(entries []os.DirEntry) (map[string]FSManager, continue } + if err := preparePoolDirectories(fsm); err != nil { + log.Msg("failed to prepare pool directories:", err.Error()) + continue + } + fsm.RefreshSnapshotList() fsManagers[pool.Name] = fsm @@ -398,6 +404,22 @@ func extractDataStateAt(dataPath string) (*time.Time, error) { return &dsa, nil } +func preparePoolDirectories(fsm FSManager) error { + dataDir := fsm.Pool().DataDir() + if err := os.MkdirAll(dataDir, 0700); err != nil { + return err + } + + return filepath.Walk(dataDir, func(name string, info os.FileInfo, err error) error { + if err == nil { + // PGDATA dir permissions must be 0700 to avoid errors. + err = os.Chmod(name, 0700) + } + + return err + }) +} + func (pm *Manager) describeAvailablePools() []string { availablePools := []string{} diff --git a/engine/internal/retrieval/retrieval.go b/engine/internal/retrieval/retrieval.go index 234add40682f93c4a0a2f178314538a5d3da07b8..abf60cf80b270527e97cd9c2368f31912fe12cc8 100644 --- a/engine/internal/retrieval/retrieval.go +++ b/engine/internal/retrieval/retrieval.go @@ -8,10 +8,7 @@ package retrieval import ( "context" "fmt" - "os" - "path/filepath" "strings" - "sync" "time" "github.com/docker/docker/client" @@ -37,21 +34,26 @@ import ( "gitlab.com/postgres-ai/database-lab/v3/pkg/models" ) +const ( + refreshJobs jobGroup = "refresh" + snapshotJobs jobGroup = "snapshot" +) + +type jobGroup string + // Retrieval describes a data retrieval. type Retrieval struct { - Scheduler Scheduler - State State - cfg *config.Config - global *global.Config - engineProps global.EngineProps - docker *client.Client - poolManager *pool.Manager - tm *telemetry.Agent - runner runners.Runner - jobs []components.JobRunner - retrieveMutex sync.Mutex - ctxCancel context.CancelFunc - jobSpecs map[string]config.JobSpec + Scheduler Scheduler + State State + cfg *config.Config + global *global.Config + engineProps global.EngineProps + docker *client.Client + poolManager *pool.Manager + tm *telemetry.Agent + runner runners.Runner + ctxCancel context.CancelFunc + statefulJobs []components.JobRunner } // Scheduler defines a refresh scheduler. @@ -62,38 +64,49 @@ type Scheduler struct { // New creates a new data retrieval. func New(cfg *dblabCfg.Config, engineProps global.EngineProps, docker *client.Client, pm *pool.Manager, tm *telemetry.Agent, - runner runners.Runner) *Retrieval { + runner runners.Runner) (*Retrieval, error) { r := &Retrieval{ - cfg: &cfg.Retrieval, global: &cfg.Global, engineProps: engineProps, docker: docker, poolManager: pm, tm: tm, runner: runner, - jobSpecs: make(map[string]config.JobSpec, len(cfg.Retrieval.Jobs)), State: State{ Status: models.Inactive, alerts: make(map[models.AlertType]models.Alert), }, } - r.formatJobsSpec() - r.defineRetrievalMode() + retrievalCfg, err := ValidateConfig(&cfg.Retrieval) + if err != nil { + return nil, err + } - return r + r.setup(retrievalCfg) + + return r, nil } // Reload reloads retrieval configuration. -func (r *Retrieval) Reload(ctx context.Context, cfg *dblabCfg.Config) { - *r.cfg = cfg.Retrieval +func (r *Retrieval) Reload(ctx context.Context, retrievalCfg *config.Config) { + r.setup(retrievalCfg) + r.reloadStatefulJobs() + r.stopScheduler() + r.setupScheduler(ctx) +} - r.formatJobsSpec() +func (r *Retrieval) setup(retrievalCfg *config.Config) { + r.cfg = retrievalCfg + + r.defineRetrievalMode() +} - for _, job := range r.jobs { +func (r *Retrieval) reloadStatefulJobs() { + for _, job := range r.statefulJobs { cfg, ok := r.cfg.JobsSpec[job.Name()] if !ok { - log.Msg("Skip reloading of the retrieval job", job.Name()) + log.Msg("Skip reloading of the stateful retrieval job. Spec not found", job.Name()) continue } @@ -101,21 +114,6 @@ func (r *Retrieval) Reload(ctx context.Context, cfg *dblabCfg.Config) { log.Err("Failed to reload configuration of the retrieval job", job.Name(), err) } } - - r.stopScheduler() - r.setupScheduler(ctx) -} - -func (r *Retrieval) formatJobsSpec() { - for _, jobName := range r.cfg.Jobs { - jobSpec, ok := r.cfg.JobsSpec[jobName] - if !ok { - continue - } - - jobSpec.Name = jobName - r.jobSpecs[jobName] = jobSpec - } } // Run start retrieving process. @@ -125,7 +123,7 @@ func (r *Retrieval) Run(ctx context.Context) error { log.Msg("Retrieval mode:", r.State.Mode) - fsManager, err := r.getPoolToDataRetrieving() + fsManager, err := r.getNextPoolToDataRetrieving() if err != nil { var skipError *SkipRefreshingError if errors.As(err, &skipError) { @@ -164,7 +162,7 @@ func (r *Retrieval) Run(ctx context.Context) error { return nil } -func (r *Retrieval) getPoolToDataRetrieving() (pool.FSManager, error) { +func (r *Retrieval) getNextPoolToDataRetrieving() (pool.FSManager, error) { firstPool := r.poolManager.First() if firstPool == nil { return nil, errors.New("no available pools") @@ -199,88 +197,152 @@ func (r *Retrieval) getPoolToDataRetrieving() (pool.FSManager, error) { } func (r *Retrieval) run(ctx context.Context, fsm pool.FSManager) (err error) { - if err := r.configure(fsm); err != nil { - return errors.Wrap(err, "failed to configure") + // Check the pool aliveness. + if _, err := fsm.GetFilesystemState(); err != nil { + return errors.Wrap(errors.Unwrap(err), "filesystem manager is not ready") } - if err := r.prepareEnvironment(fsm); err != nil { - return errors.Wrap(err, "failed to prepare retrieval environment") + poolName := fsm.Pool().Name + poolElement := r.poolManager.GetPoolByName(poolName) + + if poolElement == nil { + return errors.Errorf("pool %s not found", poolName) } - // Check the pool aliveness. - if _, err := fsm.GetFilesystemState(); err != nil { - return errors.Wrap(errors.Unwrap(err), "filesystem manager is not ready") + if err := r.RefreshData(ctx, poolName); err != nil { + return err } - poolByName := r.poolManager.GetPoolByName(fsm.Pool().Name) - if poolByName == nil { - return errors.Errorf("pool %s not found", fsm.Pool().Name) + if r.State.Status == models.Renewed { + r.State.cleanAlerts() } - if len(r.jobs) > 0 { - fsm.Pool().SetStatus(resources.RefreshingPool) + if err := r.SnapshotData(ctx, poolName); err != nil { + return err + } - r.retrieveMutex.Lock() - r.State.Status = models.Refreshing - r.State.LastRefresh = models.NewLocalTime(time.Now().Truncate(time.Second)) + if r.State.Status == models.Finished { + r.poolManager.MakeActive(poolElement) + r.State.cleanAlerts() + } - defer func() { - r.State.Status = models.Finished + return nil +} - if err != nil { - r.State.Status = models.Failed +// RefreshData runs a group of data refresh jobs. +func (r *Retrieval) RefreshData(ctx context.Context, poolName string) error { + fsm, err := r.poolManager.GetFSManager(poolName) + if err != nil { + return fmt.Errorf("failed to get %q FSManager: %w", poolName, err) + } - fsm.Pool().SetStatus(resources.EmptyPool) - } + if r.State.Status == models.Refreshing || r.State.Status == models.Snapshotting { + return fmt.Errorf("skip refreshing the data because the pool is still busy: %s", r.State.Status) + } - r.retrieveMutex.Unlock() - }() + jobs, err := r.buildJobs(fsm, refreshJobs) + if err != nil { + return fmt.Errorf("failed to build refresh jobs for %s: %w", poolName, err) + } - for _, j := range r.jobs { - if err := j.Run(ctx); err != nil { - return err - } - } + if len(jobs) == 0 { + log.Dbg("no jobs to refresh pool:", fsm.Pool()) + return nil } - r.poolManager.MakeActive(poolByName) - r.State.cleanAlerts() + log.Dbg("Refreshing data pool: ", fsm.Pool()) + + fsm.Pool().SetStatus(resources.RefreshingPool) + + r.State.Status = models.Refreshing + r.State.LastRefresh = models.NewLocalTime(time.Now().Truncate(time.Second)) + + defer func() { + r.State.Status = models.Renewed + + if err != nil { + r.State.Status = models.Failed + + fsm.Pool().SetStatus(resources.EmptyPool) + } + }() + + for _, j := range jobs { + if err = j.Run(ctx); err != nil { + return err + } + } return nil } -// configure configures retrieval service. -func (r *Retrieval) configure(fsm pool.FSManager) error { - if len(r.cfg.Jobs) == 0 { - return nil +// SnapshotData runs a group of data snapshot jobs. +func (r *Retrieval) SnapshotData(ctx context.Context, poolName string) error { + fsm, err := r.poolManager.GetFSManager(poolName) + if err != nil { + return fmt.Errorf("failed to get %q FSManager: %w", poolName, err) + } + + if r.State.Status != models.Inactive && r.State.Status != models.Renewed && r.State.Status != models.Finished { + return fmt.Errorf("pool is not ready to take a snapshot: %s", r.State.Status) + } + + jobs, err := r.buildJobs(fsm, snapshotJobs) + if err != nil { + return fmt.Errorf("failed to build snapshot jobs for %s: %w", poolName, err) + } + + if r.State.Mode == models.Physical { + r.statefulJobs = jobs } - if err := r.parseJobs(fsm); err != nil { - return errors.Wrap(err, "failed to parse retrieval jobs") + if len(jobs) == 0 { + log.Dbg("no jobs to snapshot pool data:", fsm.Pool()) + return nil } - if err := r.validate(); err != nil { - return errors.Wrap(err, "invalid data retrieval configuration") + log.Dbg("Taking a snapshot on the pool: ", fsm.Pool()) + + r.State.Status = models.Snapshotting + + defer func() { + r.State.Status = models.Finished + + if err != nil { + r.State.Status = models.Failed + + fsm.Pool().SetStatus(resources.EmptyPool) + } + }() + + for _, j := range jobs { + if err = j.Run(ctx); err != nil { + return err + } } return nil } -// parseJobs processes configuration to define data retrieval jobs. -func (r *Retrieval) parseJobs(fsm pool.FSManager) error { +// buildJobs processes the configuration spec to build data retrieval jobs. +func (r *Retrieval) buildJobs(fsm pool.FSManager, groupName jobGroup) ([]components.JobRunner, error) { retrievalRunner, err := engine.JobBuilder(r.global, r.engineProps, fsm, r.tm) if err != nil { - return errors.Wrap(err, "failed to get a job builder") + return nil, errors.Wrap(err, "failed to get a job builder") } dbMarker := dbmarker.NewMarker(fsm.Pool().DataDir()) - - r.jobs = make([]components.JobRunner, 0, len(r.cfg.Jobs)) + jobs := make([]components.JobRunner, 0) for _, jobName := range r.cfg.Jobs { - jobSpec, ok := r.jobSpecs[jobName] + jobSpec, ok := r.cfg.JobsSpec[jobName] if !ok { - return errors.Errorf("Job %q not found", jobName) + return nil, errors.Errorf("job %q not found", jobName) + } + + if getJobGroup(jobSpec.Name) != groupName { + log.Dbg(fmt.Sprintf("Skip the %s job because it does not belong to the %s group", jobName, groupName)) + continue } jobCfg := config.JobConfig{ @@ -292,71 +354,34 @@ func (r *Retrieval) parseJobs(fsm pool.FSManager) error { job, err := retrievalRunner.BuildJob(jobCfg) if err != nil { - return errors.Wrap(err, "failed to build job") + return nil, errors.Wrap(err, "failed to build job") } - r.addJob(job) - } - - return nil -} - -// addJob applies a job to the current data retrieval. -func (r *Retrieval) addJob(job components.JobRunner) { - r.jobs = append(r.jobs, job) -} - -func (r *Retrieval) validate() error { - if r.hasLogicalJob() && r.hasPhysicalJob() { - return errors.New("must not contain physical and logical jobs simultaneously") - } - - return nil -} - -func (r *Retrieval) hasLogicalJob() bool { - if len(r.jobSpecs) == 0 { - return false - } - - if _, hasLogicalDump := r.jobSpecs[logical.DumpJobType]; hasLogicalDump { - return true - } - - if _, hasLogicalRestore := r.jobSpecs[logical.RestoreJobType]; hasLogicalRestore { - return true - } - - if _, hasLogicalSnapshot := r.jobSpecs[snapshot.LogicalSnapshotType]; hasLogicalSnapshot { - return true + jobs = append(jobs, job) } - return false + return jobs, nil } -func (r *Retrieval) hasPhysicalJob() bool { - if len(r.jobSpecs) == 0 { - return false - } - - if _, hasPhysicalRestore := r.jobSpecs[physical.RestoreJobType]; hasPhysicalRestore { - return true - } +func getJobGroup(name string) jobGroup { + switch name { + case logical.DumpJobType, logical.RestoreJobType, physical.RestoreJobType: + return refreshJobs - if _, hasPhysicalSnapshot := r.jobSpecs[snapshot.PhysicalSnapshotType]; hasPhysicalSnapshot { - return true + case snapshot.LogicalSnapshotType, snapshot.PhysicalSnapshotType: + return snapshotJobs } - return false + return "" } func (r *Retrieval) defineRetrievalMode() { - if r.hasPhysicalJob() { + if hasPhysicalJob(r.cfg.JobsSpec) { r.State.Mode = models.Physical return } - if r.hasLogicalJob() { + if hasLogicalJob(r.cfg.JobsSpec) { r.State.Mode = models.Logical return } @@ -364,22 +389,6 @@ func (r *Retrieval) defineRetrievalMode() { r.State.Mode = models.Unknown } -func (r *Retrieval) prepareEnvironment(fsm pool.FSManager) error { - dataDir := fsm.Pool().DataDir() - if err := os.MkdirAll(dataDir, 0700); err != nil { - return err - } - - return filepath.Walk(dataDir, func(name string, info os.FileInfo, err error) error { - if err == nil { - // PGDATA dir permissions must be 0700 to avoid errors. - err = os.Chmod(name, 0700) - } - - return err - }) -} - func (r *Retrieval) setupScheduler(ctx context.Context) { r.stopScheduler() @@ -414,10 +423,10 @@ func (r *Retrieval) refreshFunc(ctx context.Context) func() { // fullRefresh performs full refresh for an unused storage pool and makes it active. func (r *Retrieval) fullRefresh(ctx context.Context) error { - if r.State.Status == models.Refreshing { + if r.State.Status == models.Refreshing || r.State.Status == models.Snapshotting { alert := telemetry.Alert{ Level: models.RefreshSkipped, - Message: "The data refresh is currently in progress. Skip a new data refresh iteration", + Message: "The data refresh/snapshot is currently in progress. Skip a new data refresh iteration", } r.State.addAlert(alert) r.tm.SendEvent(ctx, telemetry.AlertEvent, alert) @@ -487,31 +496,6 @@ func (r *Retrieval) stopScheduler() { } } -// IsValidConfig checks if the retrieval configuration is valid. -func IsValidConfig(cfg *dblabCfg.Config) error { - rs := New(cfg, global.EngineProps{}, nil, nil, nil, nil) - - cm, err := pool.NewManager(nil, pool.ManagerConfig{ - Pool: &resources.Pool{ - Name: "", - Mode: "zfs", - }, - }) - if err != nil { - return nil - } - - if err := rs.configure(cm); err != nil { - return err - } - - if err := rs.validate(); err != nil { - return err - } - - return nil -} - func preparePoolToRefresh(poolToUpdate pool.FSManager) error { cloneList, err := poolToUpdate.ListClonesNames() if err != nil { @@ -540,8 +524,8 @@ func preparePoolToRefresh(poolToUpdate pool.FSManager) error { return nil } -// CollectRestoreTelemetry collect restore data. -func (r *Retrieval) CollectRestoreTelemetry() telemetry.Restore { +// ReportState collects the current restore state. +func (r *Retrieval) ReportState() telemetry.Restore { return telemetry.Restore{ Mode: r.State.Mode, Refreshing: r.cfg.Refresh.Timetable, diff --git a/engine/internal/retrieval/retrieval_test.go b/engine/internal/retrieval/retrieval_test.go index f90b47e6a4308d79b47da79e631a31846c6aa946..99b70a6e37619c72ff6d40424f5861cfe086d889 100644 --- a/engine/internal/retrieval/retrieval_test.go +++ b/engine/internal/retrieval/retrieval_test.go @@ -4,149 +4,40 @@ import ( "testing" "github.com/stretchr/testify/assert" - - "gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config" ) -func TestParallelJobSpecs(t *testing.T) { - testCases := []map[string]config.JobSpec{ - { - "logicalRestore": {}, - }, - { - "physicalRestore": {}, - }, - { - "logicalDump": {}, - }, - { - "logicalDump": {}, - "logicalRestore": {}, - }, - } - - for _, tc := range testCases { - r := Retrieval{ - jobSpecs: tc, - } - - err := r.validate() - assert.Nil(t, err) - } - -} - -func TestInvalidParallelJobSpecs(t *testing.T) { - testCases := []map[string]config.JobSpec{ - { - "physicalRestore": {}, - "logicalRestore": {}, - }, - } - - for _, tc := range testCases { - r := Retrieval{ - jobSpecs: tc, - } - - err := r.validate() - assert.Error(t, err) - } -} - -func TestPhysicalJobs(t *testing.T) { +func TestJobGroup(t *testing.T) { testCases := []struct { - spec map[string]config.JobSpec - hasPhysical bool + jobName string + group jobGroup }{ { - spec: map[string]config.JobSpec{"physicalSnapshot": {}}, - hasPhysical: true, + jobName: "logicalDump", + group: refreshJobs, }, { - spec: map[string]config.JobSpec{"physicalRestore": {}}, - hasPhysical: true, + jobName: "logicalRestore", + group: refreshJobs, }, { - spec: map[string]config.JobSpec{ - "physicalSnapshot": {}, - "physicalRestore": {}, - }, - hasPhysical: true, + jobName: "physicalRestore", + group: refreshJobs, }, { - spec: map[string]config.JobSpec{}, - hasPhysical: false, + jobName: "logicalSnapshot", + group: snapshotJobs, }, { - spec: map[string]config.JobSpec{"logicalDump": {}}, - hasPhysical: false, + jobName: "physicalSnapshot", + group: snapshotJobs, }, { - spec: map[string]config.JobSpec{"logicalRestore": {}}, - hasPhysical: false, - }, - { - spec: map[string]config.JobSpec{"logicalSnapshot": {}}, - hasPhysical: false, - }, - } - - for _, tc := range testCases { - r := Retrieval{ - jobSpecs: tc.spec, - } - - hasPhysicalJob := r.hasPhysicalJob() - assert.Equal(t, tc.hasPhysical, hasPhysicalJob) - } -} - -func TestLogicalJobs(t *testing.T) { - testCases := []struct { - spec map[string]config.JobSpec - hasLogical bool - }{ - { - spec: map[string]config.JobSpec{"logicalSnapshot": {}}, - hasLogical: true, - }, - { - spec: map[string]config.JobSpec{"logicalRestore": {}}, - hasLogical: true, - }, - { - spec: map[string]config.JobSpec{"logicalDump": {}}, - hasLogical: true, - }, - { - spec: map[string]config.JobSpec{ - "logicalDump": {}, - "logicalRestore": {}, - "logicalSnapshot": {}, - }, - hasLogical: true, - }, - { - spec: map[string]config.JobSpec{}, - hasLogical: false, - }, - { - spec: map[string]config.JobSpec{"physicalRestore": {}}, - hasLogical: false, - }, - { - spec: map[string]config.JobSpec{"physicalSnapshot": {}}, - hasLogical: false, + jobName: "unknownDump", + group: "", }, } for _, tc := range testCases { - r := Retrieval{ - jobSpecs: tc.spec, - } - - hasLogicalJob := r.hasLogicalJob() - assert.Equal(t, tc.hasLogical, hasLogicalJob) + assert.Equal(t, tc.group, getJobGroup(tc.jobName)) } } diff --git a/engine/internal/retrieval/validator.go b/engine/internal/retrieval/validator.go new file mode 100644 index 0000000000000000000000000000000000000000..e5d6ca0f193ccb90055dc6dd3d0fa9a708c835a9 --- /dev/null +++ b/engine/internal/retrieval/validator.go @@ -0,0 +1,100 @@ +package retrieval + +import ( + "errors" + "fmt" + "strings" + + "gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config" + "gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/logical" + "gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/physical" + "gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/snapshot" +) + +// ValidateConfig validates retrieval configuration. +func ValidateConfig(cfg *config.Config) (*config.Config, error) { + retrievalCfg, err := formatJobsSpec(cfg) + if err != nil { + return nil, err + } + + if err = validateStructure(retrievalCfg); err != nil { + return nil, err + } + + return retrievalCfg, nil +} + +// formatJobsSpec validates job list and enriches job specifications. +func formatJobsSpec(cfg *config.Config) (*config.Config, error) { + jobSpecs := make(map[string]config.JobSpec, len(cfg.Jobs)) + undefinedJobs := []string{} + + for _, jobName := range cfg.Jobs { + jobSpec, ok := cfg.JobsSpec[jobName] + if !ok { + undefinedJobs = append(undefinedJobs, jobName) + continue + } + + jobSpec.Name = jobName + jobSpecs[jobName] = jobSpec + } + + if len(undefinedJobs) > 0 { + return nil, fmt.Errorf("config contains jobs without specification: %s", strings.Join(undefinedJobs, ", ")) + } + + jobsCfg := &config.Config{ + Refresh: cfg.Refresh, + Jobs: cfg.Jobs, + JobsSpec: jobSpecs, + } + + return jobsCfg, nil +} + +// validateStructure checks if the retrieval configuration is valid. +func validateStructure(r *config.Config) error { + if hasLogicalJob(r.JobsSpec) && hasPhysicalJob(r.JobsSpec) { + return errors.New("must not contain physical and logical jobs simultaneously") + } + + return nil +} + +func hasLogicalJob(jobSpecs map[string]config.JobSpec) bool { + if len(jobSpecs) == 0 { + return false + } + + if _, hasLogicalDump := jobSpecs[logical.DumpJobType]; hasLogicalDump { + return true + } + + if _, hasLogicalRestore := jobSpecs[logical.RestoreJobType]; hasLogicalRestore { + return true + } + + if _, hasLogicalSnapshot := jobSpecs[snapshot.LogicalSnapshotType]; hasLogicalSnapshot { + return true + } + + return false +} + +func hasPhysicalJob(jobSpecs map[string]config.JobSpec) bool { + if len(jobSpecs) == 0 { + return false + } + + if _, hasPhysicalRestore := jobSpecs[physical.RestoreJobType]; hasPhysicalRestore { + return true + } + + if _, hasPhysicalSnapshot := jobSpecs[snapshot.PhysicalSnapshotType]; hasPhysicalSnapshot { + return true + } + + return false +} diff --git a/engine/internal/retrieval/validator_test.go b/engine/internal/retrieval/validator_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3f44e66afc71602d81772b60c36b132ce5621d1e --- /dev/null +++ b/engine/internal/retrieval/validator_test.go @@ -0,0 +1,149 @@ +package retrieval + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config" +) + +func TestParallelJobSpecs(t *testing.T) { + testCases := []config.Config{ + { + Jobs: []string{"logicalRestore"}, + JobsSpec: map[string]config.JobSpec{ + "logicalRestore": {}, + }, + }, + { + Jobs: []string{"physicalRestore"}, + JobsSpec: map[string]config.JobSpec{ + "physicalRestore": {}, + }, + }, + { + Jobs: []string{"logicalDump"}, + JobsSpec: map[string]config.JobSpec{ + "logicalDump": {}, + }, + }, + { + Jobs: []string{"logicalDump", "logicalRestore"}, + JobsSpec: map[string]config.JobSpec{ + "logicalDump": {}, + "logicalRestore": {}, + }, + }, + } + + for _, tc := range testCases { + err := validateStructure(&tc) + assert.Nil(t, err) + } + +} + +func TestInvalidParallelJobSpecs(t *testing.T) { + testCases := []config.Config{ + { + Jobs: []string{"logicalRestore", "physicalRestore"}, + JobsSpec: map[string]config.JobSpec{ + "physicalRestore": {}, + "logicalRestore": {}, + }, + }, + } + + for _, tc := range testCases { + err := validateStructure(&tc) + assert.Error(t, err) + } +} + +func TestPhysicalJobs(t *testing.T) { + testCases := []struct { + spec map[string]config.JobSpec + hasPhysical bool + }{ + { + spec: map[string]config.JobSpec{"physicalSnapshot": {}}, + hasPhysical: true, + }, + { + spec: map[string]config.JobSpec{"physicalRestore": {}}, + hasPhysical: true, + }, + { + spec: map[string]config.JobSpec{ + "physicalSnapshot": {}, + "physicalRestore": {}, + }, + hasPhysical: true, + }, + { + spec: map[string]config.JobSpec{}, + hasPhysical: false, + }, + { + spec: map[string]config.JobSpec{"logicalDump": {}}, + hasPhysical: false, + }, + { + spec: map[string]config.JobSpec{"logicalRestore": {}}, + hasPhysical: false, + }, + { + spec: map[string]config.JobSpec{"logicalSnapshot": {}}, + hasPhysical: false, + }, + } + + for _, tc := range testCases { + assert.Equal(t, tc.hasPhysical, hasPhysicalJob(tc.spec)) + } +} + +func TestLogicalJobs(t *testing.T) { + testCases := []struct { + spec map[string]config.JobSpec + hasLogical bool + }{ + { + spec: map[string]config.JobSpec{"logicalSnapshot": {}}, + hasLogical: true, + }, + { + spec: map[string]config.JobSpec{"logicalRestore": {}}, + hasLogical: true, + }, + { + spec: map[string]config.JobSpec{"logicalDump": {}}, + hasLogical: true, + }, + { + spec: map[string]config.JobSpec{ + "logicalDump": {}, + "logicalRestore": {}, + "logicalSnapshot": {}, + }, + hasLogical: true, + }, + { + spec: map[string]config.JobSpec{}, + hasLogical: false, + }, + { + spec: map[string]config.JobSpec{"physicalRestore": {}}, + hasLogical: false, + }, + { + spec: map[string]config.JobSpec{"physicalSnapshot": {}}, + hasLogical: false, + }, + } + + for _, tc := range testCases { + assert.Equal(t, tc.hasLogical, hasLogicalJob(tc.spec)) + } +} diff --git a/engine/pkg/config/config.go b/engine/pkg/config/config.go index 2d49c9c3c0f7406c0e28a13d58b1dec5ffa43cf6..420467f8a132c1bb4acff8c034b7ab301c75b25a 100644 --- a/engine/pkg/config/config.go +++ b/engine/pkg/config/config.go @@ -75,7 +75,7 @@ func LoadInstanceID() (string, error) { instanceID = xid.New().String() log.Dbg("no instance_id file was found, generate new instance ID", instanceID) - if err := os.MkdirAll(path.Dir(idFilepath), 0644); err != nil { + if err := os.MkdirAll(path.Dir(idFilepath), 0744); err != nil { return "", fmt.Errorf("failed to make directory meta: %w", err) } diff --git a/engine/pkg/models/retrieval.go b/engine/pkg/models/retrieval.go index cfae483c7954a2375e93f5efac477cfbb68845b9..f4e8cbe6d14f3e21a902f755b8915794264d53d5 100644 --- a/engine/pkg/models/retrieval.go +++ b/engine/pkg/models/retrieval.go @@ -30,6 +30,10 @@ const ( Failed RetrievalStatus = "failed" // Refreshing defines status when data retrieving is in progress. Refreshing RetrievalStatus = "refreshing" + // Renewed defines status when data retrieving is successfully completed. + Renewed RetrievalStatus = "renewed" + // Snapshotting defines status when data snapshotting is in progress. + Snapshotting RetrievalStatus = "snapshotting" // Finished defines status when data retrieving is finished. Finished RetrievalStatus = "finished" )