Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/api/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newTestService(t *testing.T) *ApiService {
limits := instances.ResourceLimits{
MaxOverlaySize: 100 * 1024 * 1024 * 1024, // 100GB
}
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", nil, nil)
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", instances.SnapshotPolicy{}, nil, nil)

// Initialize network manager (creates bridge for network-enabled tests)
if err := networkMgr.Initialize(ctx(), nil); err != nil {
Expand Down
82 changes: 81 additions & 1 deletion cmd/api/api/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/kernel/hypeman/lib/network"
"github.com/kernel/hypeman/lib/oapi"
"github.com/kernel/hypeman/lib/resources"
"github.com/kernel/hypeman/lib/snapshot"
"github.com/kernel/hypeman/lib/vm_metrics"
"github.com/samber/lo"
)
Expand Down Expand Up @@ -264,6 +265,16 @@ func (s *ApiService) CreateInstance(ctx context.Context, request oapi.CreateInst
SkipKernelHeaders: request.Body.SkipKernelHeaders != nil && *request.Body.SkipKernelHeaders,
SkipGuestAgent: request.Body.SkipGuestAgent != nil && *request.Body.SkipGuestAgent,
}
if request.Body.SnapshotPolicy != nil {
snapshotPolicy, err := toInstanceSnapshotPolicy(*request.Body.SnapshotPolicy)
if err != nil {
return oapi.CreateInstance400JSONResponse{
Code: "invalid_snapshot_policy",
Message: err.Error(),
}, nil
}
domainReq.SnapshotPolicy = snapshotPolicy
}

inst, err := s.InstanceManager.CreateInstance(ctx, domainReq)
if err != nil {
Expand Down Expand Up @@ -401,7 +412,19 @@ func (s *ApiService) StandbyInstance(ctx context.Context, request oapi.StandbyIn
}
log := logger.FromContext(ctx)

result, err := s.InstanceManager.StandbyInstance(ctx, inst.Id)
standbyReq := instances.StandbyInstanceRequest{}
if request.Body != nil && request.Body.Compression != nil {
compression, err := toDomainSnapshotCompressionConfig(*request.Body.Compression)
if err != nil {
return oapi.StandbyInstance400JSONResponse{
Code: "invalid_snapshot_compression",
Message: err.Error(),
}, nil
}
standbyReq.Compression = compression
}

result, err := s.InstanceManager.StandbyInstance(ctx, inst.Id, standbyReq)
if err != nil {
switch {
case errors.Is(err, instances.ErrInvalidState):
Expand Down Expand Up @@ -858,6 +881,10 @@ func instanceToOAPI(inst instances.Instance) oapi.Instance {
if len(inst.Metadata) > 0 {
oapiInst.Metadata = toOAPIMetadata(inst.Metadata)
}
if inst.SnapshotPolicy != nil {
oapiPolicy, _ := toOAPISnapshotPolicy(*inst.SnapshotPolicy)
oapiInst.SnapshotPolicy = &oapiPolicy
}

// Convert volume attachments
if len(inst.Volumes) > 0 {
Expand Down Expand Up @@ -892,3 +919,56 @@ func instanceToOAPI(inst instances.Instance) oapi.Instance {

return oapiInst
}

func toDomainSnapshotCompressionConfig(cfg oapi.SnapshotCompressionConfig) (*snapshot.SnapshotCompressionConfig, error) {
out := &snapshot.SnapshotCompressionConfig{
Enabled: cfg.Enabled,
}
if cfg.Algorithm != nil {
out.Algorithm = snapshot.SnapshotCompressionAlgorithm(*cfg.Algorithm)
}
if cfg.Level != nil {
level := *cfg.Level
out.Level = &level
}
return out, nil
}

func toInstanceSnapshotPolicy(policy oapi.SnapshotPolicy) (*instances.SnapshotPolicy, error) {
out := &instances.SnapshotPolicy{}
if policy.Compression != nil {
compression, err := toDomainSnapshotCompressionConfig(*policy.Compression)
if err != nil {
return nil, err
}
out.Compression = compression
}
return out, nil
}

func toOAPISnapshotCompressionConfig(cfg snapshot.SnapshotCompressionConfig) (oapi.SnapshotCompressionConfig, error) {
out := oapi.SnapshotCompressionConfig{
Enabled: cfg.Enabled,
}
if cfg.Algorithm != "" {
algo := oapi.SnapshotCompressionConfigAlgorithm(cfg.Algorithm)
out.Algorithm = &algo
}
if cfg.Level != nil {
level := *cfg.Level
out.Level = &level
}
return out, nil
}

func toOAPISnapshotPolicy(policy instances.SnapshotPolicy) (oapi.SnapshotPolicy, error) {
out := oapi.SnapshotPolicy{}
if policy.Compression != nil {
compression, err := toOAPISnapshotCompressionConfig(*policy.Compression)
if err != nil {
return oapi.SnapshotPolicy{}, err
}
out.Compression = &compression
}
return out, nil
}
33 changes: 30 additions & 3 deletions cmd/api/api/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
mw "github.com/kernel/hypeman/lib/middleware"
"github.com/kernel/hypeman/lib/network"
"github.com/kernel/hypeman/lib/oapi"
"github.com/kernel/hypeman/lib/snapshot"
"github.com/samber/lo"
)

Expand All @@ -27,11 +28,20 @@ func (s *ApiService) CreateInstanceSnapshot(ctx context.Context, request oapi.Cr
if request.Body.Name != nil {
name = *request.Body.Name
}
var compression *snapshot.SnapshotCompressionConfig
if request.Body.Compression != nil {
var err error
compression, err = toDomainSnapshotCompressionConfig(*request.Body.Compression)
if err != nil {
return oapi.CreateInstanceSnapshot400JSONResponse{Code: "invalid_snapshot_compression", Message: err.Error()}, nil
}
}

result, err := s.InstanceManager.CreateSnapshot(ctx, inst.Id, instances.CreateSnapshotRequest{
Kind: instances.SnapshotKind(request.Body.Kind),
Name: name,
Metadata: toMapMetadata(request.Body.Metadata),
Kind: instances.SnapshotKind(request.Body.Kind),
Name: name,
Metadata: toMapMetadata(request.Body.Metadata),
Compression: compression,
})
if err != nil {
log := logger.FromContext(ctx)
Expand Down Expand Up @@ -207,6 +217,23 @@ func snapshotToOAPI(snapshot instances.Snapshot) oapi.Snapshot {
SizeBytes: snapshot.SizeBytes,
Name: lo.ToPtr(snapshot.Name),
}
if snapshot.CompressionState != "" {
state := oapi.SnapshotCompressionState(snapshot.CompressionState)
out.CompressionState = &state
}
if snapshot.CompressionError != "" {
out.CompressionError = lo.ToPtr(snapshot.CompressionError)
}
if snapshot.Compression != nil {
compression, _ := toOAPISnapshotCompressionConfig(*snapshot.Compression)
out.Compression = &compression
}
if snapshot.CompressedSizeBytes != nil {
out.CompressedSizeBytes = snapshot.CompressedSizeBytes
}
if snapshot.UncompressedSizeBytes != nil {
out.UncompressedSizeBytes = snapshot.UncompressedSizeBytes
}
if snapshot.Name == "" {
out.Name = nil
}
Expand Down
29 changes: 29 additions & 0 deletions cmd/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ type HypervisorConfig struct {
FirecrackerBinaryPath string `koanf:"firecracker_binary_path"`
}

// SnapshotCompressionDefaultConfig holds default snapshot compression settings.
type SnapshotCompressionDefaultConfig struct {
Enabled bool `koanf:"enabled"`
Algorithm string `koanf:"algorithm"`
Level int `koanf:"level"`
}

// SnapshotConfig holds snapshot defaults.
type SnapshotConfig struct {
CompressionDefault SnapshotCompressionDefaultConfig `koanf:"compression_default"`
}

// GPUConfig holds GPU-related settings.
type GPUConfig struct {
ProfileCacheTTL string `koanf:"profile_cache_ttl"`
Expand All @@ -183,6 +195,7 @@ type Config struct {
Oversubscription OversubscriptionConfig `koanf:"oversubscription"`
Capacity CapacityConfig `koanf:"capacity"`
Hypervisor HypervisorConfig `koanf:"hypervisor"`
Snapshot SnapshotConfig `koanf:"snapshot"`
GPU GPUConfig `koanf:"gpu"`
}

Expand Down Expand Up @@ -302,6 +315,14 @@ func defaultConfig() *Config {
FirecrackerBinaryPath: "",
},

Snapshot: SnapshotConfig{
CompressionDefault: SnapshotCompressionDefaultConfig{
Enabled: false,
Algorithm: "zstd",
Level: 1,
},
},

GPU: GPUConfig{
ProfileCacheTTL: "30m",
},
Expand Down Expand Up @@ -400,5 +421,13 @@ func (c *Config) Validate() error {
if c.Build.Timeout <= 0 {
return fmt.Errorf("build.timeout must be positive, got %d", c.Build.Timeout)
}
if c.Snapshot.CompressionDefault.Level < 1 {
return fmt.Errorf("snapshot.compression_default.level must be >= 1, got %d", c.Snapshot.CompressionDefault.Level)
}
switch strings.ToLower(c.Snapshot.CompressionDefault.Algorithm) {
case "", "zstd", "lz4":
default:
return fmt.Errorf("snapshot.compression_default.algorithm must be one of zstd or lz4, got %q", c.Snapshot.CompressionDefault.Algorithm)
}
return nil
}
2 changes: 1 addition & 1 deletion integration/systemd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestSystemdMode(t *testing.T) {
MaxMemoryPerInstance: 0,
}

instanceManager := instances.NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", nil, nil)
instanceManager := instances.NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", instances.SnapshotPolicy{}, nil, nil)

// Cleanup any orphaned instances
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion integration/vgpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestVGPU(t *testing.T) {
MaxMemoryPerInstance: 0,
}

instanceManager := instances.NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", nil, nil)
instanceManager := instances.NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", instances.SnapshotPolicy{}, nil, nil)

// Track instance ID for cleanup
var instanceID string
Expand Down
2 changes: 1 addition & 1 deletion lib/builds/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (m *mockInstanceManager) ForkSnapshot(ctx context.Context, snapshotID strin
return nil, instances.ErrNotFound
}

func (m *mockInstanceManager) StandbyInstance(ctx context.Context, id string) (*instances.Instance, error) {
func (m *mockInstanceManager) StandbyInstance(ctx context.Context, id string, req instances.StandbyInstanceRequest) (*instances.Instance, error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion lib/devices/gpu_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestGPUPassthrough(t *testing.T) {
limits := instances.ResourceLimits{
MaxOverlaySize: 100 * 1024 * 1024 * 1024, // 100GB
}
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", nil, nil)
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", instances.SnapshotPolicy{}, nil, nil)

// Step 1: Discover available GPUs
t.Log("Step 1: Discovering available GPUs...")
Expand Down
2 changes: 1 addition & 1 deletion lib/devices/gpu_inference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestGPUInference(t *testing.T) {
limits := instances.ResourceLimits{
MaxOverlaySize: 100 * 1024 * 1024 * 1024,
}
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", nil, nil)
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", instances.SnapshotPolicy{}, nil, nil)

// Step 1: Build custom CUDA+Ollama image
t.Log("Step 1: Building custom CUDA+Ollama Docker image...")
Expand Down
4 changes: 2 additions & 2 deletions lib/devices/gpu_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNVIDIAModuleLoading(t *testing.T) {
deviceMgr := devices.NewManager(p)
volumeMgr := volumes.NewManager(p, 10*1024*1024*1024, nil)
limits := instances.ResourceLimits{MaxOverlaySize: 10 * 1024 * 1024 * 1024}
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", nil, nil)
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", instances.SnapshotPolicy{}, nil, nil)

// Step 1: Find an NVIDIA GPU
t.Log("Step 1: Discovering available GPUs...")
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestNVMLDetection(t *testing.T) {
deviceMgr := devices.NewManager(p)
volumeMgr := volumes.NewManager(p, 10*1024*1024*1024, nil)
limits := instances.ResourceLimits{MaxOverlaySize: 10 * 1024 * 1024 * 1024}
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", nil, nil)
instanceMgr := instances.NewManager(p, imageMgr, systemMgr, networkMgr, deviceMgr, volumeMgr, limits, "", instances.SnapshotPolicy{}, nil, nil)

// Step 1: Check if ollama-cuda:test image exists in Docker
t.Log("Step 1: Checking for ollama-cuda:test Docker image...")
Expand Down
6 changes: 6 additions & 0 deletions lib/instances/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (m *manager) createInstance(
Cmd: req.Cmd,
SkipKernelHeaders: req.SkipKernelHeaders,
SkipGuestAgent: req.SkipGuestAgent,
SnapshotPolicy: cloneSnapshotPolicy(req.SnapshotPolicy),
}

// 12. Ensure directories
Expand Down Expand Up @@ -470,6 +471,11 @@ func validateCreateRequest(req CreateInstanceRequest) error {
if err := tags.Validate(req.Metadata); err != nil {
return fmt.Errorf("%w: %v", ErrInvalidRequest, err)
}
if req.SnapshotPolicy != nil && req.SnapshotPolicy.Compression != nil {
if _, err := normalizeCompressionConfig(req.SnapshotPolicy.Compression); err != nil {
return err
}
}

// Validate volume attachments
if err := validateVolumeAttachments(req.Volumes); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions lib/instances/firecracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func setupTestManagerForFirecracker(t *testing.T) (*manager, string) {
MaxVcpusPerInstance: 0,
MaxMemoryPerInstance: 0,
}
mgr := NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, hypervisor.TypeFirecracker, nil, nil).(*manager)
mgr := NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, hypervisor.TypeFirecracker, SnapshotPolicy{}, nil, nil).(*manager)

resourceMgr := resources.NewManager(cfg, p)
resourceMgr.SetInstanceLister(mgr)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestFirecrackerStandbyAndRestore(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, StateRunning, inst.State)

inst, err = mgr.StandbyInstance(ctx, inst.Id)
inst, err = mgr.StandbyInstance(ctx, inst.Id, StandbyInstanceRequest{})
require.NoError(t, err)
assert.Equal(t, StateStandby, inst.State)
assert.True(t, inst.HasSnapshot)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestFirecrackerStopClearsStaleSnapshot(t *testing.T) {
require.Equal(t, StateRunning, inst.State)

// Establish a realistic standby/restore lifecycle first.
inst, err = mgr.StandbyInstance(ctx, inst.Id)
inst, err = mgr.StandbyInstance(ctx, inst.Id, StandbyInstanceRequest{})
require.NoError(t, err)
require.Equal(t, StateStandby, inst.State)
require.True(t, inst.HasSnapshot)
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestFirecrackerNetworkLifecycle(t *testing.T) {
require.Equal(t, 0, exitCode)
require.Contains(t, output, "Connection successful")

inst, err = mgr.StandbyInstance(ctx, inst.Id)
inst, err = mgr.StandbyInstance(ctx, inst.Id, StandbyInstanceRequest{})
require.NoError(t, err)
assert.Equal(t, StateStandby, inst.State)
assert.True(t, inst.HasSnapshot)
Expand Down
6 changes: 3 additions & 3 deletions lib/instances/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *manager) forkInstance(ctx context.Context, id string, req ForkInstanceR

log.InfoContext(ctx, "fork from running requested; transitioning source to standby",
"source_instance_id", id, "hypervisor", source.HypervisorType)
if _, err := m.standbyInstance(ctx, id); err != nil {
if _, err := m.standbyInstance(ctx, id, StandbyInstanceRequest{}, true); err != nil {
return nil, "", fmt.Errorf("standby source instance: %w", err)
}

Expand Down Expand Up @@ -421,7 +421,7 @@ func (m *manager) applyForkTargetState(ctx context.Context, forkID string, targe
if _, err := m.startInstance(ctx, forkID, StartInstanceRequest{}); err != nil {
return nil, fmt.Errorf("start forked instance for standby transition: %w", err)
}
return returnWithReadiness(m.standbyInstance(ctx, forkID))
return returnWithReadiness(m.standbyInstance(ctx, forkID, StandbyInstanceRequest{}, false))
}
case StateStandby:
switch target {
Expand All @@ -436,7 +436,7 @@ func (m *manager) applyForkTargetState(ctx context.Context, forkID string, targe
case StateRunning:
switch target {
case StateStandby:
return returnWithReadiness(m.standbyInstance(ctx, forkID))
return returnWithReadiness(m.standbyInstance(ctx, forkID, StandbyInstanceRequest{}, false))
case StateStopped:
return returnWithReadiness(m.stopInstance(ctx, forkID))
}
Expand Down
Loading
Loading