IF YOU WOULD LIKE TO GET AN ACCOUNT, please write an email to s dot adaszewski at gmail dot com. User accounts are meant only to report issues and/or generate pull requests. This is a purpose-specific Git hosting for ADARED projects. Thank you for your understanding!
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
9.5KB

  1. import pytest
  2. import subprocess
  3. from focker.volume import command_volume_create, \
  4. command_volume_prune, \
  5. command_volume_list, \
  6. command_volume_tag, \
  7. command_volume_untag, \
  8. command_volume_remove, \
  9. command_volume_set, \
  10. command_volume_get, \
  11. command_volume_protect, \
  12. command_volume_unprotect
  13. from focker.zfs import zfs_find, \
  14. zfs_mountpoint, \
  15. zfs_exists, \
  16. zfs_parse_output, \
  17. zfs_destroy, \
  18. zfs_prune, \
  19. zfs_run
  20. import os
  21. import focker.volume
  22. import focker.zfs
  23. def test_command_volume_create():
  24. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-create'])
  25. args = lambda: 0
  26. args.tags = ['test-command-volume-create']
  27. command_volume_create(args)
  28. name, sha256 = zfs_find('test-command-volume-create', focker_type='volume')
  29. assert os.path.exists(zfs_mountpoint(name))
  30. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-create'])
  31. def test_command_volume_prune():
  32. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-prune'])
  33. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-prune'])
  34. name, sha256 = zfs_find('test-command-volume-prune', focker_type='volume')
  35. mountpoint = zfs_mountpoint(name)
  36. subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-prune'])
  37. args = lambda: 0
  38. command_volume_prune(args)
  39. assert not zfs_exists(name)
  40. assert not os.path.exists(mountpoint)
  41. def test_command_volume_list(monkeypatch):
  42. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-list'])
  43. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'])
  44. name, sha256 = zfs_find('test-command-volume-list', focker_type='volume')
  45. mountpoint = zfs_mountpoint(name)
  46. args = lambda: 0
  47. args.full_sha256 = True
  48. lst = None
  49. headers = None
  50. def fake_tabulate(*args, **kwargs):
  51. nonlocal lst
  52. nonlocal headers
  53. lst = args[0]
  54. headers = kwargs['headers']
  55. monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
  56. command_volume_list(args)
  57. assert lst is not None
  58. assert headers == ['Tags', 'Size', 'SHA256', 'Mountpoint']
  59. assert len(lst) >= 3
  60. match = list(filter(lambda a: sorted(a[0].split(' ')) == ['test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'], lst))
  61. assert len(match) == 1
  62. match = match[0]
  63. assert match[2] == sha256
  64. assert match[3] == mountpoint
  65. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-list'])
  66. def test_command_volume_tag():
  67. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-tag'])
  68. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-tag'])
  69. name_1, sha256_1 = zfs_find('test-command-volume-tag', focker_type='volume')
  70. args = lambda: 0
  71. args.reference = sha256_1
  72. args.tags = ['test-a', 'test-b', 'test-c']
  73. command_volume_tag(args)
  74. for t in args.tags:
  75. name_2, sha256_2 = zfs_find(t, focker_type='volume')
  76. assert name_2 == name_1
  77. assert sha256_2 == sha256_1
  78. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-tag'])
  79. for t in args.tags:
  80. with pytest.raises(ValueError):
  81. zfs_find(t, focker_type='volume')
  82. with pytest.raises(ValueError):
  83. zfs_find('test-command-volume-tag', focker_type='volume')
  84. def test_command_volume_untag():
  85. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-untag'])
  86. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-untag', 'test-command-volume-untag-1', 'test-command-volume-untag-2'])
  87. name, sha256 = zfs_find('test-command-volume-untag', focker_type='volume')
  88. args = lambda: 0
  89. args.tags = ['test-command-volume-untag-1', 'test-command-volume-untag-2']
  90. command_volume_untag(args)
  91. tags = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name])
  92. tags = tags[0][2].split(',')
  93. assert tags == ['test-command-volume-untag']
  94. with pytest.raises(ValueError):
  95. zfs_find('test-command-volume-untag-1', focker_type='volume')
  96. with pytest.raises(ValueError):
  97. zfs_find('test-command-image-untag-2', focker_type='volume')
  98. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-untag'])
  99. def test_command_volume_remove():
  100. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-remove'])
  101. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-remove'])
  102. name, sha256 = zfs_find('test-command-volume-remove', focker_type='volume')
  103. mountpoint = zfs_mountpoint(name)
  104. args = lambda: 0
  105. args.references = ['test-command-volume-remove']
  106. args.force = False
  107. command_volume_remove(args)
  108. with pytest.raises(ValueError):
  109. zfs_find('test-command-volume-remove', focker_type='volume')
  110. with pytest.raises(ValueError):
  111. zfs_find(sha256, focker_type='volume')
  112. assert not os.path.exists(mountpoint)
  113. assert not zfs_exists(name)
  114. def test_command_volume_set():
  115. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-set'])
  116. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-set'])
  117. name, sha256 = zfs_find('test-command-volume-set', focker_type='volume')
  118. args = lambda: 0
  119. args.reference = 'test-command-volume-set'
  120. args.properties = ['rdonly=on', 'quota=1G']
  121. command_volume_set(args)
  122. props = zfs_parse_output(['zfs', 'get', '-H', 'rdonly,quota', name])
  123. assert len(props) == 2
  124. for i in range(2):
  125. assert props[i][0] == name
  126. assert props[0][1] == 'readonly'
  127. assert props[1][1] == 'quota'
  128. assert props[0][2] == 'on'
  129. assert props[1][2] == '1G'
  130. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-set'])
  131. def test_command_volume_get(monkeypatch):
  132. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-get'])
  133. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-get'])
  134. subprocess.check_output(['focker', 'volume', 'set', 'test-command-volume-get', 'rdonly=on', 'quota=1G'])
  135. name, sha256 = zfs_find('test-command-volume-get', focker_type='volume')
  136. args = lambda: 0
  137. args.reference = 'test-command-volume-get'
  138. args.properties = ['rdonly', 'quota']
  139. lst = None
  140. headers = None
  141. def fake_tabulate(*args, **kwargs):
  142. nonlocal lst
  143. nonlocal headers
  144. lst = args[0]
  145. headers = kwargs['headers']
  146. monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
  147. command_volume_get(args)
  148. assert lst is not None
  149. assert headers is not None
  150. assert lst == [ ['rdonly', 'on'], ['quota', '1G'] ]
  151. assert headers == [ 'Property', 'Value' ]
  152. # assert lst == ['on', '1G']
  153. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-get'])
  154. def test_command_volume_protect(monkeypatch):
  155. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-protect'])
  156. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-protect'])
  157. args = lambda: 0
  158. args.references = ['test-command-volume-protect']
  159. command_volume_protect(args)
  160. name, sha256 = zfs_find('test-command-volume-protect', focker_type='volume')
  161. mountpoint = zfs_mountpoint(name)
  162. lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name])
  163. assert len(lst) == 1
  164. assert lst[0][2] == 'on'
  165. with pytest.raises(RuntimeError):
  166. zfs_destroy(name)
  167. subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-protect'])
  168. lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name])
  169. assert len(lst) == 1
  170. assert lst[0][2] == '-'
  171. n_called = 0
  172. def fake_run(*args, **kwargs):
  173. nonlocal n_called
  174. n_called += 1
  175. return zfs_run(*args, **kwargs)
  176. monkeypatch.setattr(focker.zfs, 'zfs_run', fake_run)
  177. zfs_prune(focker_type='volume')
  178. assert not n_called == 1
  179. with pytest.raises(subprocess.CalledProcessError):
  180. subprocess.check_output(['focker', 'volume', 'remove', sha256])
  181. subprocess.check_output(['zfs', 'destroy', '-r', '-f', name])
  182. assert not zfs_exists(name)
  183. assert not os.path.exists(mountpoint)
  184. def test_command_volume_unprotect():
  185. subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-unprotect'])
  186. subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-unprotect'])
  187. subprocess.check_output(['focker', 'volume', 'protect', 'test-command-volume-unprotect'])
  188. name, _ = zfs_find('test-command-volume-unprotect', focker_type='volume')
  189. lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name])
  190. assert len(lst) == 1
  191. assert lst[0][2] == 'on'
  192. args = lambda: 0
  193. args.references = ['test-command-volume-unprotect']
  194. command_volume_unprotect(args)
  195. lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name])
  196. assert len(lst) == 1
  197. assert lst[0][2] == '-'
  198. subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-unprotect'])