IF YOU WOULD LIKE TO GET AN ACCOUNT, please write an email to s dot adaszewski at gmail dot com. User accounts are meant only to report issues and/or generate pull requests. This is a purpose-specific Git hosting for ADARED projects. Thank you for your understanding!
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

165 lignes
7.1KB

  1. import pytest
  2. import subprocess
  3. from focker.volume import command_volume_create, \
  4. command_volume_prune, \
  5. command_volume_list, \
  6. command_volume_tag, \
  7. command_volume_untag, \
  8. command_volume_remove, \
  9. command_volume_set, \
  10. command_volume_get
  11. from focker.zfs import zfs_find, \
  12. zfs_mountpoint, \
  13. zfs_exists, \
  14. zfs_parse_output
  15. import os
  16. import focker.volume
  17. def test_command_volume_create():
  18. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-create'])
  19. args = lambda: 0
  20. args.tags = ['test-command-volume-create']
  21. command_volume_create(args)
  22. name, sha256 = zfs_find('test-command-volume-create', focker_type='volume')
  23. assert os.path.exists(zfs_mountpoint(name))
  24. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-create'])
  25. def test_command_volume_prune():
  26. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-prune'])
  27. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-prune'])
  28. name, sha256 = zfs_find('test-command-volume-prune', focker_type='volume')
  29. mountpoint = zfs_mountpoint(name)
  30. subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-prune'])
  31. args = lambda: 0
  32. command_volume_prune(args)
  33. assert not zfs_exists(name)
  34. assert not os.path.exists(mountpoint)
  35. def test_command_volume_list(monkeypatch):
  36. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-list'])
  37. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'])
  38. name, sha256 = zfs_find('test-command-volume-list', focker_type='volume')
  39. mountpoint = zfs_mountpoint(name)
  40. args = lambda: 0
  41. args.full_sha256 = True
  42. lst = None
  43. headers = None
  44. def fake_tabulate(*args, **kwargs):
  45. nonlocal lst
  46. nonlocal headers
  47. lst = args[0]
  48. headers = kwargs['headers']
  49. monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
  50. command_volume_list(args)
  51. assert lst is not None
  52. assert headers == ['Tags', 'Size', 'SHA256', 'Mountpoint']
  53. assert len(lst) >= 3
  54. match = list(filter(lambda a: sorted(a[0].split(' ')) == ['test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'], lst))
  55. assert len(match) == 1
  56. match = match[0]
  57. assert match[2] == sha256
  58. assert match[3] == mountpoint
  59. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-list'])
  60. def test_command_volume_tag():
  61. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-tag'])
  62. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-tag'])
  63. name_1, sha256_1 = zfs_find('test-command-volume-tag', focker_type='volume')
  64. args = lambda: 0
  65. args.reference = sha256_1
  66. args.tags = ['test-a', 'test-b', 'test-c']
  67. command_volume_tag(args)
  68. for t in args.tags:
  69. name_2, sha256_2 = zfs_find(t, focker_type='volume')
  70. assert name_2 == name_1
  71. assert sha256_2 == sha256_1
  72. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-tag'])
  73. for t in args.tags:
  74. with pytest.raises(ValueError):
  75. zfs_find(t, focker_type='volume')
  76. with pytest.raises(ValueError):
  77. zfs_find('test-command-volume-tag', focker_type='volume')
  78. def test_command_volume_untag():
  79. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-untag'])
  80. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-untag', 'test-command-volume-untag-1', 'test-command-volume-untag-2'])
  81. name, sha256 = zfs_find('test-command-volume-untag', focker_type='volume')
  82. args = lambda: 0
  83. args.tags = ['test-command-volume-untag-1', 'test-command-volume-untag-2']
  84. command_volume_untag(args)
  85. tags = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name])
  86. tags = tags[0][2].split(',')
  87. assert tags == ['test-command-volume-untag']
  88. with pytest.raises(ValueError):
  89. zfs_find('test-command-volume-untag-1', focker_type='volume')
  90. with pytest.raises(ValueError):
  91. zfs_find('test-command-image-untag-2', focker_type='volume')
  92. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-untag'])
  93. def test_command_volume_remove():
  94. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-remove'])
  95. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-remove'])
  96. name, sha256 = zfs_find('test-command-volume-remove', focker_type='volume')
  97. mountpoint = zfs_mountpoint(name)
  98. args = lambda: 0
  99. args.references = ['test-command-volume-remove']
  100. args.force = False
  101. command_volume_remove(args)
  102. with pytest.raises(ValueError):
  103. zfs_find('test-command-volume-remove', focker_type='volume')
  104. with pytest.raises(ValueError):
  105. zfs_find(sha256, focker_type='volume')
  106. assert not os.path.exists(mountpoint)
  107. assert not zfs_exists(name)
  108. def test_command_volume_set():
  109. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-set'])
  110. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-set'])
  111. name, sha256 = zfs_find('test-command-volume-set', focker_type='volume')
  112. args = lambda: 0
  113. args.reference = 'test-command-volume-set'
  114. args.properties = ['rdonly=on', 'quota=1G']
  115. command_volume_set(args)
  116. props = zfs_parse_output(['zfs', 'get', '-H', 'rdonly,quota', name])
  117. assert len(props) == 2
  118. for i in range(2):
  119. assert props[i][0] == name
  120. assert props[0][1] == 'readonly'
  121. assert props[1][1] == 'quota'
  122. assert props[0][2] == 'on'
  123. assert props[1][2] == '1G'
  124. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-set'])
  125. def test_command_volume_get(monkeypatch):
  126. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-get'])
  127. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-get'])
  128. subprocess.check_output(['focker', 'volume', 'set', 'test-command-volume-get', 'rdonly=on', 'quota=1G'])
  129. name, sha256 = zfs_find('test-command-volume-get', focker_type='volume')
  130. args = lambda: 0
  131. args.reference = 'test-command-volume-get'
  132. args.properties = ['rdonly', 'quota']
  133. lst = None
  134. headers = None
  135. def fake_tabulate(*args, **kwargs):
  136. nonlocal lst
  137. nonlocal headers
  138. lst = args[0]
  139. headers = kwargs['headers']
  140. monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
  141. command_volume_get(args)
  142. assert lst is not None
  143. assert headers is not None
  144. assert lst == [ ['rdonly', 'on'], ['quota', '1G'] ]
  145. assert headers == [ 'Property', 'Value' ]
  146. # assert lst == ['on', '1G']
  147. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-get'])