IF YOU WOULD LIKE TO GET AN ACCOUNT, please write an email to s dot adaszewski at gmail dot com. User accounts are meant only to report issues and/or generate pull requests. This is a purpose-specific Git hosting for ADARED projects. Thank you for your understanding!
Stanislaw Adaszewski 4 роки тому
джерело
коміт
677f587715
2 змінених файлів з 166 додано та 2 видалено
  1. +2
    -2
      focker/volume.py
  2. +164
    -0
      tests/test_volume.py

+ 2
- 2
focker/volume.py Переглянути файл

@@ -69,5 +69,5 @@ def command_volume_get(args):
if not args.properties:
raise ValueError('You must specify some properties')
res = zfs_parse_output(['zfs', 'get', '-H', ','.join(args.properties), name])
res = map(lambda a: a[2], res)
print('\n'.join(res))
res = [ [ args.properties[i], a[2] ] for i, a in enumerate(res) ]
print(tabulate(res, headers=['Property', 'Value']))

+ 164
- 0
tests/test_volume.py Переглянути файл

@@ -0,0 +1,164 @@
import pytest
import subprocess
from focker.volume import command_volume_create, \
command_volume_prune, \
command_volume_list, \
command_volume_tag, \
command_volume_untag, \
command_volume_remove, \
command_volume_set, \
command_volume_get
from focker.zfs import zfs_find, \
zfs_mountpoint, \
zfs_exists, \
zfs_parse_output
import os
import focker.volume
def test_command_volume_create():
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-create'])
args = lambda: 0
args.tags = ['test-command-volume-create']
command_volume_create(args)
name, sha256 = zfs_find('test-command-volume-create', focker_type='volume')
assert os.path.exists(zfs_mountpoint(name))
subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-create'])
def test_command_volume_prune():
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-prune'])
subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-prune'])
name, sha256 = zfs_find('test-command-volume-prune', focker_type='volume')
mountpoint = zfs_mountpoint(name)
subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-prune'])
args = lambda: 0
command_volume_prune(args)
assert not zfs_exists(name)
assert not os.path.exists(mountpoint)
def test_command_volume_list(monkeypatch):
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-list'])
subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'])
name, sha256 = zfs_find('test-command-volume-list', focker_type='volume')
mountpoint = zfs_mountpoint(name)
args = lambda: 0
args.full_sha256 = True
lst = None
headers = None
def fake_tabulate(*args, **kwargs):
nonlocal lst
nonlocal headers
lst = args[0]
headers = kwargs['headers']
monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
command_volume_list(args)
assert lst is not None
assert headers == ['Tags', 'Size', 'SHA256', 'Mountpoint']
assert len(lst) >= 3
match = list(filter(lambda a: sorted(a[0].split(' ')) == ['test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'], lst))
assert len(match) == 1
match = match[0]
assert match[2] == sha256
assert match[3] == mountpoint
subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-list'])
def test_command_volume_tag():
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-tag'])
subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-tag'])
name_1, sha256_1 = zfs_find('test-command-volume-tag', focker_type='volume')
args = lambda: 0
args.reference = sha256_1
args.tags = ['test-a', 'test-b', 'test-c']
command_volume_tag(args)
for t in args.tags:
name_2, sha256_2 = zfs_find(t, focker_type='volume')
assert name_2 == name_1
assert sha256_2 == sha256_1
subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-tag'])
for t in args.tags:
with pytest.raises(ValueError):
zfs_find(t, focker_type='volume')
with pytest.raises(ValueError):
zfs_find('test-command-volume-tag', focker_type='volume')
def test_command_volume_untag():
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-untag'])
subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-untag', 'test-command-volume-untag-1', 'test-command-volume-untag-2'])
name, sha256 = zfs_find('test-command-volume-untag', focker_type='volume')
args = lambda: 0
args.tags = ['test-command-volume-untag-1', 'test-command-volume-untag-2']
command_volume_untag(args)
tags = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name])
tags = tags[0][2].split(',')
assert tags == ['test-command-volume-untag']
with pytest.raises(ValueError):
zfs_find('test-command-volume-untag-1', focker_type='volume')
with pytest.raises(ValueError):
zfs_find('test-command-image-untag-2', focker_type='volume')
subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-untag'])
def test_command_volume_remove():
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-remove'])
subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-remove'])
name, sha256 = zfs_find('test-command-volume-remove', focker_type='volume')
mountpoint = zfs_mountpoint(name)
args = lambda: 0
args.references = ['test-command-volume-remove']
args.force = False
command_volume_remove(args)
with pytest.raises(ValueError):
zfs_find('test-command-volume-remove', focker_type='volume')
with pytest.raises(ValueError):
zfs_find(sha256, focker_type='volume')
assert not os.path.exists(mountpoint)
assert not zfs_exists(name)
def test_command_volume_set():
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-set'])
subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-set'])
name, sha256 = zfs_find('test-command-volume-set', focker_type='volume')
args = lambda: 0
args.reference = 'test-command-volume-set'
args.properties = ['rdonly=on', 'quota=1G']
command_volume_set(args)
props = zfs_parse_output(['zfs', 'get', '-H', 'rdonly,quota', name])
assert len(props) == 2
for i in range(2):
assert props[i][0] == name
assert props[0][1] == 'readonly'
assert props[1][1] == 'quota'
assert props[0][2] == 'on'
assert props[1][2] == '1G'
subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-set'])
def test_command_volume_get(monkeypatch):
subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-get'])
subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-get'])
subprocess.check_output(['focker', 'volume', 'set', 'test-command-volume-get', 'rdonly=on', 'quota=1G'])
name, sha256 = zfs_find('test-command-volume-get', focker_type='volume')
args = lambda: 0
args.reference = 'test-command-volume-get'
args.properties = ['rdonly', 'quota']
lst = None
headers = None
def fake_tabulate(*args, **kwargs):
nonlocal lst
nonlocal headers
lst = args[0]
headers = kwargs['headers']
monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
command_volume_get(args)
assert lst is not None
assert headers is not None
assert lst == [ ['rdonly', 'on'], ['quota', '1G'] ]
assert headers == [ 'Property', 'Value' ]
# assert lst == ['on', '1G']
subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-get'])

Завантаження…
Відмінити
Зберегти