|
1
|
"""Tests for the fossilrepo-ctl bundle export/import commands.""" |
|
2
|
|
|
3
|
from unittest.mock import MagicMock, patch |
|
4
|
|
|
5
|
import pytest |
|
6
|
from click.testing import CliRunner |
|
7
|
|
|
8
|
from ctl.main import cli |
|
9
|
|
|
10
|
|
|
11
|
@pytest.fixture |
|
12
|
def runner(): |
|
13
|
return CliRunner() |
|
14
|
|
|
15
|
|
|
16
|
@pytest.mark.django_db |
|
17
|
class TestBundleExport: |
|
18
|
def test_export_missing_project(self, runner): |
|
19
|
"""Export with a non-existent project slug prints an error.""" |
|
20
|
result = runner.invoke(cli, ["bundle", "export", "nonexistent-project", "/tmp/out.bundle"]) |
|
21
|
assert result.exit_code == 0 |
|
22
|
assert "No repository found" in result.output |
|
23
|
|
|
24
|
def test_export_repo_not_on_disk(self, runner, sample_project): |
|
25
|
"""Export when the .fossil file does not exist on disk.""" |
|
26
|
from fossil.models import FossilRepository |
|
27
|
|
|
28
|
with patch.object(type(FossilRepository.objects.get(project=sample_project)), "exists_on_disk", new_callable=lambda: property(lambda self: False)): |
|
29
|
result = runner.invoke(cli, ["bundle", "export", sample_project.slug, "/tmp/out.bundle"]) |
|
30
|
assert result.exit_code == 0 |
|
31
|
assert "not found on disk" in result.output |
|
32
|
|
|
33
|
def test_export_fossil_not_available(self, runner, sample_project): |
|
34
|
"""Export when fossil binary is not found.""" |
|
35
|
from fossil.models import FossilRepository |
|
36
|
|
|
37
|
repo = FossilRepository.objects.get(project=sample_project) |
|
38
|
|
|
39
|
with ( |
|
40
|
patch.object(type(repo), "exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
41
|
patch("fossil.cli.FossilCLI.is_available", return_value=False), |
|
42
|
): |
|
43
|
result = runner.invoke(cli, ["bundle", "export", sample_project.slug, "/tmp/out.bundle"]) |
|
44
|
assert result.exit_code == 0 |
|
45
|
assert "Fossil binary not found" in result.output |
|
46
|
|
|
47
|
def test_export_success(self, runner, sample_project, tmp_path): |
|
48
|
"""Export succeeds when fossil binary is available and returns 0.""" |
|
49
|
from fossil.models import FossilRepository |
|
50
|
|
|
51
|
repo = FossilRepository.objects.get(project=sample_project) |
|
52
|
output_path = tmp_path / "test.bundle" |
|
53
|
|
|
54
|
mock_run = MagicMock() |
|
55
|
mock_run.returncode = 0 |
|
56
|
mock_run.stdout = "" |
|
57
|
mock_run.stderr = "" |
|
58
|
|
|
59
|
with ( |
|
60
|
patch.object(type(repo), "exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
61
|
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
62
|
patch("subprocess.run", return_value=mock_run), |
|
63
|
): |
|
64
|
# Create a fake output file so size calculation works |
|
65
|
output_path.write_bytes(b"x" * 1024) |
|
66
|
result = runner.invoke(cli, ["bundle", "export", sample_project.slug, str(output_path)]) |
|
67
|
assert result.exit_code == 0 |
|
68
|
assert "Success" in result.output |
|
69
|
|
|
70
|
def test_export_failure(self, runner, sample_project, tmp_path): |
|
71
|
"""Export reports failure when fossil returns non-zero.""" |
|
72
|
from fossil.models import FossilRepository |
|
73
|
|
|
74
|
repo = FossilRepository.objects.get(project=sample_project) |
|
75
|
output_path = tmp_path / "test.bundle" |
|
76
|
|
|
77
|
mock_run = MagicMock() |
|
78
|
mock_run.returncode = 1 |
|
79
|
mock_run.stdout = "" |
|
80
|
mock_run.stderr = "bundle export failed" |
|
81
|
|
|
82
|
with ( |
|
83
|
patch.object(type(repo), "exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
84
|
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
85
|
patch("subprocess.run", return_value=mock_run), |
|
86
|
): |
|
87
|
result = runner.invoke(cli, ["bundle", "export", sample_project.slug, str(output_path)]) |
|
88
|
assert result.exit_code == 0 |
|
89
|
assert "Failed" in result.output |
|
90
|
|
|
91
|
|
|
92
|
@pytest.mark.django_db |
|
93
|
class TestBundleImport: |
|
94
|
def test_import_missing_project(self, runner): |
|
95
|
"""Import with a non-existent project slug prints an error.""" |
|
96
|
result = runner.invoke(cli, ["bundle", "import", "nonexistent-project", "/tmp/in.bundle"]) |
|
97
|
assert result.exit_code == 0 |
|
98
|
assert "No repository found" in result.output |
|
99
|
|
|
100
|
def test_import_bundle_file_not_found(self, runner, sample_project): |
|
101
|
"""Import when the bundle file does not exist.""" |
|
102
|
from fossil.models import FossilRepository |
|
103
|
|
|
104
|
repo = FossilRepository.objects.get(project=sample_project) |
|
105
|
|
|
106
|
with patch.object(type(repo), "exists_on_disk", new_callable=lambda: property(lambda self: True)): |
|
107
|
result = runner.invoke(cli, ["bundle", "import", sample_project.slug, "/tmp/definitely-not-a-file.bundle"]) |
|
108
|
assert result.exit_code == 0 |
|
109
|
assert "not found" in result.output.lower() |
|
110
|
|
|
111
|
def test_import_success(self, runner, sample_project, tmp_path): |
|
112
|
"""Import succeeds when fossil binary is available and returns 0.""" |
|
113
|
from fossil.models import FossilRepository |
|
114
|
|
|
115
|
repo = FossilRepository.objects.get(project=sample_project) |
|
116
|
bundle_file = tmp_path / "test.bundle" |
|
117
|
bundle_file.write_bytes(b"fake-bundle") |
|
118
|
|
|
119
|
mock_run = MagicMock() |
|
120
|
mock_run.returncode = 0 |
|
121
|
mock_run.stdout = "imported 42 artifacts" |
|
122
|
mock_run.stderr = "" |
|
123
|
|
|
124
|
with ( |
|
125
|
patch.object(type(repo), "exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
126
|
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
127
|
patch("subprocess.run", return_value=mock_run), |
|
128
|
): |
|
129
|
result = runner.invoke(cli, ["bundle", "import", sample_project.slug, str(bundle_file)]) |
|
130
|
assert result.exit_code == 0 |
|
131
|
assert "Success" in result.output |
|
132
|
|
|
133
|
def test_import_failure(self, runner, sample_project, tmp_path): |
|
134
|
"""Import reports failure when fossil returns non-zero.""" |
|
135
|
from fossil.models import FossilRepository |
|
136
|
|
|
137
|
repo = FossilRepository.objects.get(project=sample_project) |
|
138
|
bundle_file = tmp_path / "test.bundle" |
|
139
|
bundle_file.write_bytes(b"fake-bundle") |
|
140
|
|
|
141
|
mock_run = MagicMock() |
|
142
|
mock_run.returncode = 1 |
|
143
|
mock_run.stdout = "" |
|
144
|
mock_run.stderr = "invalid bundle format" |
|
145
|
|
|
146
|
with ( |
|
147
|
patch.object(type(repo), "exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
148
|
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
149
|
patch("subprocess.run", return_value=mock_run), |
|
150
|
): |
|
151
|
result = runner.invoke(cli, ["bundle", "import", sample_project.slug, str(bundle_file)]) |
|
152
|
assert result.exit_code == 0 |
|
153
|
assert "Failed" in result.output |
|
154
|
|
|
155
|
|
|
156
|
class TestBundleCLIGroup: |
|
157
|
def test_bundle_help(self, runner): |
|
158
|
"""Bundle group shows help text.""" |
|
159
|
result = runner.invoke(cli, ["bundle", "--help"]) |
|
160
|
assert result.exit_code == 0 |
|
161
|
assert "export" in result.output.lower() |
|
162
|
assert "import" in result.output.lower() |
|
163
|
|
|
164
|
def test_export_help(self, runner): |
|
165
|
result = runner.invoke(cli, ["bundle", "export", "--help"]) |
|
166
|
assert result.exit_code == 0 |
|
167
|
assert "PROJECT_SLUG" in result.output |
|
168
|
|
|
169
|
def test_import_help(self, runner): |
|
170
|
result = runner.invoke(cli, ["bundle", "import", "--help"]) |
|
171
|
assert result.exit_code == 0 |
|
172
|
assert "PROJECT_SLUG" in result.output |
|
173
|
|