git: 20d2e961ee73 - main - devel/py-proxmoxer: Fix tests
- Go to: [ bottom of page ] [ top of archives ] [ this month ]
Date: Tue, 24 Dec 2024 08:49:25 UTC
The branch main has been updated by eduardo:
URL: https://cgit.FreeBSD.org/ports/commit/?id=20d2e961ee73f108e45838473147bcccea0c5a7b
commit 20d2e961ee73f108e45838473147bcccea0c5a7b
Author: Nuno Teixeira <eduardo@FreeBSD.org>
AuthorDate: 2024-12-24 08:47:09 +0000
Commit: Nuno Teixeira <eduardo@FreeBSD.org>
CommitDate: 2024-12-24 08:47:09 +0000
devel/py-proxmoxer: Fix tests
Sync PYPI tests with GH released version until fixed upstream
See also: https://github.com/proxmoxer/proxmoxer/issues/195
PR: 283360
---
devel/py-proxmoxer/Makefile | 6 +-
devel/py-proxmoxer/files/extra-patch-tests | 1117 ++++++++++++++++++++++++++++
2 files changed, 1122 insertions(+), 1 deletion(-)
diff --git a/devel/py-proxmoxer/Makefile b/devel/py-proxmoxer/Makefile
index 46b164ef0e90..38863903dec8 100644
--- a/devel/py-proxmoxer/Makefile
+++ b/devel/py-proxmoxer/Makefile
@@ -17,12 +17,16 @@ RUN_DEPENDS= ${PYTHON_PKGNAMEPREFIX}requests>=2.0.0:www/py-requests@${PY_FLAVOR}
TEST_DEPENDS= ${PYTHON_PKGNAMEPREFIX}coveralls>0:devel/py-coveralls@${PY_FLAVOR} \
${PYTHON_PKGNAMEPREFIX}openssh-wrapper>0:security/py-openssh-wrapper@${PY_FLAVOR} \
${PYTHON_PKGNAMEPREFIX}paramiko>0:security/py-paramiko@${PY_FLAVOR} \
+ ${PYTHON_PKGNAMEPREFIX}requests-toolbelt>0:www/py-requests-toolbelt@${PY_FLAVOR} \
${PYTHON_PKGNAMEPREFIX}responses>0:devel/py-responses@${PY_FLAVOR}
USES= python
USE_PYTHON= autoplist pep517 pytest
+BINARY_ALIAS= python3=${PYTHON_CMD}
-TESTING_UNSAFE= https://github.com/proxmoxer/proxmoxer/issues/195
+# Sync PYPI tests with GH released version
+# See also: https://github.com/proxmoxer/proxmoxer/issues/195
+EXTRA_PATCHES= ${FILESDIR}/extra-patch-tests
NO_ARCH= yes
diff --git a/devel/py-proxmoxer/files/extra-patch-tests b/devel/py-proxmoxer/files/extra-patch-tests
new file mode 100644
index 000000000000..e2a5bbf58fb1
--- /dev/null
+++ b/devel/py-proxmoxer/files/extra-patch-tests
@@ -0,0 +1,1117 @@
+Sync PYPI tests with GH released version
+
+diff -ruN tests/__init__.py proxmoxer-2.2.0/tests/__init__.py
+--- tests/__init__.py 1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/__init__.py 2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,3 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
+diff -ruN tests/api_mock.py proxmoxer-2.2.0/tests/api_mock.py
+--- tests/api_mock.py 1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/api_mock.py 2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,360 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
++
++import json
++import re
++from urllib.parse import parse_qsl, urlparse
++
++import pytest
++import responses
++from requests_toolbelt import MultipartEncoder
++
++
++@pytest.fixture()
++def mock_pve():
++ with responses.RequestsMock(registry=PVERegistry, assert_all_requests_are_fired=False) as rsps:
++ yield rsps
++
++
++class PVERegistry(responses.registries.FirstMatchRegistry):
++ base_url = "https://1.2.3.4:1234/api2/json"
++
++ common_headers = {
++ "Cache-Control": "max-age=0",
++ "Connection": "close, Keep-Alive",
++ "Pragma": "no-cache",
++ "Server": "pve-api-daemon/3.0",
++ "Content-Type": "application/json;charset=UTF-8",
++ }
++
++ def __init__(self):
++ super().__init__()
++ for resp in self._generate_static_responses():
++ self.add(resp)
++
++ for resp in self._generate_dynamic_responses():
++ self.add(resp)
++
++ def _generate_static_responses(self):
++ resps = []
++
++ # Basic GET requests
++ resps.append(
++ responses.Response(
++ method="GET",
++ url=self.base_url + "/version",
++ json={"data": {"version": "7.2-3", "release": "7.2", "repoid": "c743d6c1"}},
++ )
++ )
++
++ resps.append(
++ responses.Response(
++ method="POST",
++ url=re.compile(self.base_url + r"/nodes/[^/]+/storage/[^/]+/download-url"),
++ # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
++ json={
++ "data": "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done",
++ "success": 1,
++ },
++ )
++ )
++
++ resps.append(
++ responses.Response(
++ method="POST",
++ url=re.compile(self.base_url + r"/nodes/[^/]+/storage/storage1/upload"),
++ # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
++ json={"data": "UPID:node:0017C594:0ADB2769:63EC5455:imgcopy::root@pam:done"},
++ )
++ )
++ resps.append(
++ responses.Response(
++ method="POST",
++ url=re.compile(self.base_url + r"/nodes/[^/]+/storage/missing/upload"),
++ status=500,
++ body="storage 'missing' does not exist",
++ )
++ )
++
++ return resps
++
++ def _generate_dynamic_responses(self):
++ resps = []
++
++ # Authentication
++ resps.append(
++ responses.CallbackResponse(
++ method="POST",
++ url=self.base_url + "/access/ticket",
++ callback=self._cb_password_auth,
++ )
++ )
++
++ # Session testing
++ resps.append(
++ responses.CallbackResponse(
++ method="GET",
++ url=self.base_url + "/fake/echo",
++ callback=self._cb_echo,
++ )
++ )
++
++ resps.append(
++ responses.CallbackResponse(
++ method="GET",
++ url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/agent/exec"),
++ callback=self._cb_echo,
++ )
++ )
++
++ resps.append(
++ responses.CallbackResponse(
++ method="GET",
++ url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/monitor"),
++ callback=self._cb_qemu_monitor,
++ )
++ )
++
++ resps.append(
++ responses.CallbackResponse(
++ method="GET",
++ url=re.compile(self.base_url + r"/nodes/[^/]+/tasks/[^/]+/status"),
++ callback=self._cb_task_status,
++ )
++ )
++
++ resps.append(
++ responses.CallbackResponse(
++ method="GET",
++ url=re.compile(self.base_url + r"/nodes/[^/]+/query-url-metadata.*"),
++ callback=self._cb_url_metadata,
++ )
++ )
++
++ return resps
++
++ ###################################
++ # Callbacks for Dynamic Responses #
++ ###################################
++
++ def _cb_echo(self, request):
++ body = request.body
++ if body is not None:
++ if isinstance(body, MultipartEncoder):
++ body = body.to_string() # really, to byte string
++ body = body if isinstance(body, str) else str(body, "utf-8")
++
++ resp = {
++ "method": request.method,
++ "url": request.url,
++ "headers": dict(request.headers),
++ "cookies": request._cookies.get_dict(),
++ "body": body,
++ # "body_json": dict(parse_qsl(request.body)),
++ }
++ return (200, self.common_headers, json.dumps(resp))
++
++ def _cb_password_auth(self, request):
++ form_data_dict = dict(parse_qsl(request.body))
++
++ # if this user should not be authenticated
++ if form_data_dict.get("username") == "bad_auth":
++ return (
++ 401,
++ self.common_headers,
++ json.dumps({"data": None}),
++ )
++ # if this user requires OTP and it is not included
++ if form_data_dict.get("username") == "otp" and form_data_dict.get("otp") is None:
++ return (
++ 200,
++ self.common_headers,
++ json.dumps(
++ {
++ "data": {
++ "ticket": "otp_ticket",
++ "CSRFPreventionToken": "CSRFPreventionToken",
++ "NeedTFA": 1,
++ }
++ }
++ ),
++ )
++
++ # if this is the first ticket
++ if form_data_dict.get("password") != "ticket":
++ return (
++ 200,
++ self.common_headers,
++ json.dumps(
++ {"data": {"ticket": "ticket", "CSRFPreventionToken": "CSRFPreventionToken"}}
++ ),
++ )
++ # if this is refreshing the ticket, return new ticket
++ else:
++ return (
++ 200,
++ self.common_headers,
++ json.dumps(
++ {
++ "data": {
++ "ticket": "new_ticket",
++ "CSRFPreventionToken": "CSRFPreventionToken_2",
++ }
++ }
++ ),
++ )
++
++ def _cb_task_status(self, request):
++ resp = {}
++ if "keep-running" in request.url:
++ resp = {
++ "data": {
++ "id": "110",
++ "pid": 1044989,
++ "node": "node1",
++ "pstart": 284768076,
++ "status": "running",
++ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
++ "starttime": 1661825068,
++ "user": "root@pam",
++ "type": "vzdump",
++ }
++ }
++
++ elif "stopped" in request.url:
++ resp = {
++ "data": {
++ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
++ "starttime": 1661825068,
++ "user": "root@pam",
++ "type": "vzdump",
++ "pstart": 284768076,
++ "status": "stopped",
++ "exitstatus": "interrupted by signal",
++ "pid": 1044989,
++ "id": "110",
++ "node": "node1",
++ }
++ }
++
++ elif "done" in request.url:
++ resp = {
++ "data": {
++ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++ "starttime": 1661825068,
++ "user": "root@pam",
++ "type": "vzdump",
++ "pstart": 284768076,
++ "status": "stopped",
++ "exitstatus": "OK",
++ "pid": 1044989,
++ "id": "110",
++ "node": "node1",
++ }
++ }
++
++ elif "comment" in request.url:
++ resp = {
++ "data": {
++ "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
++ "node": "node",
++ "pid": 0,
++ "pstart": 0,
++ "starttime": 0,
++ "type": "task",
++ "id": "id",
++ "user": "root@pam",
++ "status": "stopped",
++ "exitstatus": "OK",
++ }
++ }
++
++ return (200, self.common_headers, json.dumps(resp))
++
++ def _cb_url_metadata(self, request):
++ form_data_dict = dict(parse_qsl((urlparse(request.url)).query))
++
++ if "file.iso" in form_data_dict.get("url", ""):
++ return (
++ 200,
++ self.common_headers,
++ json.dumps(
++ {
++ "data": {
++ "size": 123456,
++ "filename": "file.iso",
++ "mimetype": "application/x-iso9660-image",
++ # "mimetype": "application/octet-stream",
++ },
++ "success": 1,
++ }
++ ),
++ )
++ elif "invalid.iso" in form_data_dict.get("url", ""):
++ return (
++ 500,
++ self.common_headers,
++ json.dumps(
++ {
++ "status": 500,
++ "message": "invalid server response: '500 Can't connect to sub.domain.tld:443 (certificate verify failed)'\n",
++ "success": 0,
++ "data": None,
++ }
++ ),
++ )
++ elif "missing.iso" in form_data_dict.get("url", ""):
++ return (
++ 500,
++ self.common_headers,
++ json.dumps(
++ {
++ "status": 500,
++ "success": 0,
++ "message": "invalid server response: '404 Not Found'\n",
++ "data": None,
++ }
++ ),
++ )
++
++ elif "index.html" in form_data_dict.get("url", ""):
++ return (
++ 200,
++ self.common_headers,
++ json.dumps(
++ {
++ "success": 1,
++ "data": {"filename": "index.html", "mimetype": "text/html", "size": 17664},
++ }
++ ),
++ )
++
++ def _cb_qemu_monitor(self, request):
++ body = request.body
++ if body is not None:
++ body = body if isinstance(body, str) else str(body, "utf-8")
++
++ # if the command is an array, throw the type error PVE would throw
++ if "&" in body:
++ return (
++ 400,
++ self.common_headers,
++ json.dumps(
++ {
++ "data": None,
++ "errors": {"command": "type check ('string') failed - got ARRAY"},
++ }
++ ),
++ )
++ else:
++ resp = {
++ "method": request.method,
++ "url": request.url,
++ "headers": dict(request.headers),
++ "cookies": request._cookies.get_dict(),
++ "body": body,
++ # "body_json": dict(parse_qsl(request.body)),
++ }
++ print(resp)
++ return (200, self.common_headers, json.dumps(resp))
+diff -ruN tests/files_mock.py proxmoxer-2.2.0/tests/files_mock.py
+--- tests/files_mock.py 1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/files_mock.py 2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,127 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
++
++import re
++
++import pytest
++import responses
++from requests import exceptions
++
++from .api_mock import PVERegistry
++
++
++@pytest.fixture()
++def mock_files():
++ with responses.RequestsMock(
++ registry=FilesRegistry, assert_all_requests_are_fired=False
++ ) as rsps:
++ yield rsps
++
++
++class FilesRegistry(responses.registries.FirstMatchRegistry):
++ base_url = "https://sub.domain.tld"
++
++ common_headers = {
++ "Cache-Control": "max-age=0",
++ "Connection": "close, Keep-Alive",
++ "Pragma": "no-cache",
++ "Server": "pve-api-daemon/3.0",
++ "Content-Type": "application/json;charset=UTF-8",
++ }
++
++ def __init__(self):
++ super().__init__()
++ for resp in self._generate_static_responses():
++ self.add(resp)
++
++ def _generate_static_responses(self):
++ resps = []
++
++ # Basic GET requests
++ resps.append(responses.Response(method="GET", url=self.base_url, body="hello world"))
++ resps.append(
++ responses.Response(method="GET", url=self.base_url + "/file.iso", body="CONTENTS")
++ )
++
++ # sibling
++ resps.append(
++ responses.Response(
++ method="GET", url=self.base_url + "/sibling/file.iso", body="CONTENTS\n"
++ )
++ )
++ resps.append(
++ responses.Response(
++ method="GET",
++ url=self.base_url + "/sibling/TESTINGSUMS",
++ body="this_is_the_hash file.iso",
++ )
++ )
++
++ # extension
++ resps.append(
++ responses.Response(
++ method="GET", url=self.base_url + "/extension/file.iso", body="CONTENTS\n"
++ )
++ )
++ resps.append(
++ responses.Response(
++ method="GET",
++ url=self.base_url + "/extension/file.iso.testing",
++ body="this_is_the_hash file.iso",
++ )
++ )
++ resps.append(
++ responses.Response(
++ method="GET",
++ url=self.base_url + "/extension/connectionerror.iso.testing",
++ body=exceptions.ConnectionError(),
++ )
++ )
++ resps.append(
++ responses.Response(
++ method="GET",
++ url=self.base_url + "/extension/readtimeout.iso.testing",
++ body=exceptions.ReadTimeout(),
++ )
++ )
++
++ # extension upper
++ resps.append(
++ responses.Response(
++ method="GET", url=self.base_url + "/upper/file.iso", body="CONTENTS\n"
++ )
++ )
++ resps.append(
++ responses.Response(
++ method="GET",
++ url=self.base_url + "/upper/file.iso.TESTING",
++ body="this_is_the_hash file.iso",
++ )
++ )
++
++ resps.append(
++ responses.Response(
++ method="GET",
++ url=re.compile(self.base_url + r"/checksums/file.iso.\w+"),
++ body="1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 file.iso",
++ )
++ )
++
++ return resps
++
++
++@pytest.fixture()
++def mock_files_and_pve():
++ with responses.RequestsMock(registry=BothRegistry, assert_all_requests_are_fired=False) as rsps:
++ yield rsps
++
++
++class BothRegistry(responses.registries.FirstMatchRegistry):
++ def __init__(self):
++ super().__init__()
++ registries = [FilesRegistry(), PVERegistry()]
++
++ for reg in registries:
++ for resp in reg.registered:
++ self.add(resp)
+diff -ruN tests/tools/__init__.py proxmoxer-2.2.0/tests/tools/__init__.py
+--- tests/tools/__init__.py 1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/tools/__init__.py 2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,3 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
+diff -ruN tests/tools/test_files.py proxmoxer-2.2.0/tests/tools/test_files.py
+--- tests/tools/test_files.py 1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/tools/test_files.py 2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,375 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2023"
++__license__ = "MIT"
++
++import logging
++import tempfile
++from unittest import mock
++
++import pytest
++
++from proxmoxer import ProxmoxAPI, core
++from proxmoxer.tools import ChecksumInfo, Files, SupportedChecksums
++
++from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401
++from ..files_mock import ( # pylint: disable=unused-import # noqa: F401
++ mock_files,
++ mock_files_and_pve,
++)
++
++MODULE_LOGGER_NAME = "proxmoxer.tools.files"
++
++
++class TestChecksumInfo:
++ def test_basic(self):
++ info = ChecksumInfo("name", 123)
++
++ assert info.name == "name"
++ assert info.hex_size == 123
++
++ def test_str(self):
++ info = ChecksumInfo("name", 123)
++
++ assert str(info) == "name"
++
++ def test_repr(self):
++ info = ChecksumInfo("name", 123)
++
++ assert repr(info) == "name (123 digits)"
++
++
++class TestGetChecksum:
++ def test_get_checksum_from_sibling_file_success(self, mock_files):
++ url = "https://sub.domain.tld/sibling/file.iso"
++ exp_hash = "this_is_the_hash"
++ info = ChecksumInfo("testing", 16)
++ res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
++ res2 = Files._get_checksum_from_sibling_file(url, checksum_info=info, filename="file.iso")
++
++ assert res1 == exp_hash
++ assert res2 == exp_hash
++
++ def test_get_checksum_from_sibling_file_fail(self, mock_files):
++ url = "https://sub.domain.tld/sibling/missing.iso"
++ info = ChecksumInfo("testing", 16)
++ res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
++ res2 = Files._get_checksum_from_sibling_file(
++ url, checksum_info=info, filename="missing.iso"
++ )
++
++ assert res1 is None
++ assert res2 is None
++
++ def test_get_checksum_from_extension_success(self, mock_files):
++ url = "https://sub.domain.tld/extension/file.iso"
++ exp_hash = "this_is_the_hash"
++ info = ChecksumInfo("testing", 16)
++ res1 = Files._get_checksum_from_extension(url, checksum_info=info)
++ res2 = Files._get_checksum_from_extension(url, checksum_info=info, filename="file.iso")
++
++ assert res1 == exp_hash
++ assert res2 == exp_hash
++
++ def test_get_checksum_from_extension_fail(self, mock_files):
++ url = "https://sub.domain.tld/extension/missing.iso"
++
++ info = ChecksumInfo("testing", 16)
++ res1 = Files._get_checksum_from_extension(url, checksum_info=info)
++ res2 = Files._get_checksum_from_extension(
++ url, checksum_info=info, filename="connectionerror.iso"
++ )
++ res3 = Files._get_checksum_from_extension(
++ url, checksum_info=info, filename="readtimeout.iso"
++ )
++
++ assert res1 is None
++ assert res2 is None
++ assert res3 is None
++
++ def test_get_checksum_from_extension_upper_success(self, mock_files):
++ url = "https://sub.domain.tld/upper/file.iso"
++ exp_hash = "this_is_the_hash"
++ info = ChecksumInfo("testing", 16)
++ res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
++ res2 = Files._get_checksum_from_extension_upper(
++ url, checksum_info=info, filename="file.iso"
++ )
++
++ assert res1 == exp_hash
++ assert res2 == exp_hash
++
++ def test_get_checksum_from_extension_upper_fail(self, mock_files):
++ url = "https://sub.domain.tld/upper/missing.iso"
++ info = ChecksumInfo("testing", 16)
++ res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
++ res2 = Files._get_checksum_from_extension_upper(
++ url, checksum_info=info, filename="missing.iso"
++ )
++
++ assert res1 is None
++ assert res2 is None
++
++ def test_get_checksums_from_file_url_all_checksums(self, mock_files):
++ base_url = "https://sub.domain.tld/checksums/file.iso"
++ full_checksum_string = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
++ for types_enum in SupportedChecksums:
++ checksum_info = types_enum.value
++
++ data = Files.get_checksums_from_file_url(base_url, preferred_type=checksum_info)
++
++ assert data[0] == full_checksum_string[0 : checksum_info.hex_size]
++ assert data[1] == checksum_info
++
++ def test_get_checksums_from_file_url_missing(self, mock_files):
++ url = "https://sub.domain.tld/missing.iso"
++
++ data = Files.get_checksums_from_file_url(url)
++
++ assert data[0] is None
++ assert data[1] is None
++
++
++class TestFiles:
++ prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
++
++ def test_init_basic(self):
++ f = Files(self.prox, "node1", "storage1")
++
++ assert f._prox == self.prox
++ assert f._node == "node1"
++ assert f._storage == "storage1"
++
++ def test_repr(self):
++ f = Files(self.prox, "node1", "storage1")
++ assert (
++ repr(f)
++ == "Files (node1/storage1 at ProxmoxAPI (https backend for https://1.2.3.4:1234/api2/json))"
++ )
++
++ def test_get_file_info_pass(self, mock_pve):
++ f = Files(self.prox, "node1", "storage1")
++ info = f.get_file_info("https://sub.domain.tld/file.iso")
++
++ assert info["filename"] == "file.iso"
++ assert info["mimetype"] == "application/x-iso9660-image"
++ assert info["size"] == 123456
++
++ def test_get_file_info_fail(self, mock_pve):
++ f = Files(self.prox, "node1", "storage1")
++ info = f.get_file_info("https://sub.domain.tld/invalid.iso")
++
++ assert info is None
++
++
++class TestFilesDownload:
++ prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
++ f = Files(prox, "node1", "storage1")
++
++ def test_download_discover_checksum(self, mock_files_and_pve, caplog):
++ status = self.f.download_file_to_storage("https://sub.domain.tld/checksums/file.iso")
++
++ # this is the default "done" task mock information
++ assert status == {
++ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++ "starttime": 1661825068,
++ "user": "root@pam",
++ "type": "vzdump",
++ "pstart": 284768076,
++ "status": "stopped",
++ "exitstatus": "OK",
++ "pid": 1044989,
++ "id": "110",
++ "node": "node1",
++ }
++ assert caplog.record_tuples == []
++
++ def test_download_no_blocking(self, mock_files_and_pve, caplog):
++ status = self.f.download_file_to_storage(
++ "https://sub.domain.tld/checksums/file.iso", blocking_status=False
++ )
++
++ # this is the default "done" task mock information
++ assert status == {
++ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++ "starttime": 1661825068,
++ "user": "root@pam",
++ "type": "vzdump",
++ "pstart": 284768076,
++ "status": "stopped",
++ "exitstatus": "OK",
++ "pid": 1044989,
++ "id": "110",
++ "node": "node1",
++ }
++ assert caplog.record_tuples == []
++
++ def test_download_no_discover_checksum(self, mock_files_and_pve, caplog):
++ caplog.set_level(logging.WARNING, logger=MODULE_LOGGER_NAME)
++
++ status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso")
++
++ # this is the default "stopped" task mock information
++ assert status == {
++ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++ "starttime": 1661825068,
++ "user": "root@pam",
++ "type": "vzdump",
++ "pstart": 284768076,
++ "status": "stopped",
++ "exitstatus": "OK",
++ "pid": 1044989,
++ "id": "110",
++ "node": "node1",
++ }
++ assert caplog.record_tuples == [
++ (
++ MODULE_LOGGER_NAME,
++ logging.WARNING,
++ "Unable to discover checksum. Will not do checksum validation",
++ ),
++ ]
++
++ def test_uneven_checksum(self, caplog, mock_files_and_pve):
++ caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
++ status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso", checksum="asdf")
++
++ assert status is None
++
++ assert caplog.record_tuples == [
++ (
++ MODULE_LOGGER_NAME,
++ logging.ERROR,
++ "Must pass both checksum and checksum_type or leave both None for auto-discovery",
++ ),
++ ]
++
++ def test_uneven_checksum_type(self, caplog, mock_files_and_pve):
++ caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
++ status = self.f.download_file_to_storage(
++ "https://sub.domain.tld/file.iso", checksum_type="asdf"
++ )
++
++ assert status is None
++
++ assert caplog.record_tuples == [
++ (
++ MODULE_LOGGER_NAME,
++ logging.ERROR,
++ "Must pass both checksum and checksum_type or leave both None for auto-discovery",
++ ),
++ ]
++
++ def test_get_file_info_missing(self, mock_pve):
++ f = Files(self.prox, "node1", "storage1")
++ info = f.get_file_info("https://sub.domain.tld/missing.iso")
++
++ assert info is None
++
++ def test_get_file_info_non_iso(self, mock_pve):
++ f = Files(self.prox, "node1", "storage1")
++ info = f.get_file_info("https://sub.domain.tld/index.html")
++
++ assert info["filename"] == "index.html"
++ assert info["mimetype"] == "text/html"
++
++
++class TestFilesUpload:
++ prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
++ f = Files(prox, "node1", "storage1")
++
++ def test_upload_no_file(self, mock_files_and_pve, caplog):
++ status = self.f.upload_local_file_to_storage("/does-not-exist.iso")
++
++ assert status is None
++ assert caplog.record_tuples == [
++ (
++ MODULE_LOGGER_NAME,
++ logging.ERROR,
++ '"/does-not-exist.iso" does not exist or is not a file',
++ ),
++ ]
++
++ def test_upload_dir(self, mock_files_and_pve, caplog):
++ with tempfile.TemporaryDirectory() as tmp_dir:
++ status = self.f.upload_local_file_to_storage(tmp_dir)
++
++ assert status is None
++ assert caplog.record_tuples == [
++ (
++ MODULE_LOGGER_NAME,
++ logging.ERROR,
++ f'"{tmp_dir}" does not exist or is not a file',
++ ),
++ ]
++
++ def test_upload_empty_file(self, mock_files_and_pve, caplog):
++ with tempfile.NamedTemporaryFile("rb") as f_obj:
++ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++ assert status is not None
++ assert caplog.record_tuples == []
++
++ def test_upload_non_empty_file(self, mock_files_and_pve, caplog):
++ with tempfile.NamedTemporaryFile("w+b") as f_obj:
++ f_obj.write(b"a" * 100)
++ f_obj.seek(0)
++ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++ assert status is not None
++ assert caplog.record_tuples == []
++
++ def test_upload_no_checksum(self, mock_files_and_pve, caplog):
++ with tempfile.NamedTemporaryFile("rb") as f_obj:
++ status = self.f.upload_local_file_to_storage(
++ filename=f_obj.name, do_checksum_check=False
++ )
++
++ assert status is not None
++ assert caplog.record_tuples == []
++
++ def test_upload_checksum_unavailable(self, mock_files_and_pve, caplog, apply_no_checksums):
++ with tempfile.NamedTemporaryFile("rb") as f_obj:
++ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++ assert status is not None
++ assert caplog.record_tuples == [
++ (
++ MODULE_LOGGER_NAME,
++ logging.WARNING,
++ "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation",
++ )
++ ]
++
++ def test_upload_non_blocking(self, mock_files_and_pve, caplog):
++ with tempfile.NamedTemporaryFile("rb") as f_obj:
++ status = self.f.upload_local_file_to_storage(filename=f_obj.name, blocking_status=False)
++
++ assert status is not None
++ assert caplog.record_tuples == []
++
++ def test_upload_proxmox_error(self, mock_files_and_pve, caplog):
++ with tempfile.NamedTemporaryFile("rb") as f_obj:
++ f_copy = Files(self.f._prox, self.f._node, "missing")
++
++ with pytest.raises(core.ResourceException) as exc_info:
++ f_copy.upload_local_file_to_storage(filename=f_obj.name)
++
++ assert exc_info.value.status_code == 500
++ assert exc_info.value.status_message == "Internal Server Error"
++ # assert exc_info.value.content == "storage 'missing' does not exist"
++
++ def test_upload_io_error(self, mock_files_and_pve, caplog):
++ with tempfile.NamedTemporaryFile("rb") as f_obj:
++ mo = mock.mock_open()
++ mo.side_effect = IOError("ERROR MESSAGE")
++ with mock.patch("builtins.open", mo):
++ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++ assert status is None
++ assert caplog.record_tuples == [(MODULE_LOGGER_NAME, logging.ERROR, "ERROR MESSAGE")]
++
++
++@pytest.fixture
++def apply_no_checksums():
++ with mock.patch("hashlib.algorithms_available", set()):
++ yield
+diff -ruN tests/tools/test_tasks.py proxmoxer-2.2.0/tests/tools/test_tasks.py
+--- tests/tools/test_tasks.py 1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/tools/test_tasks.py 2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,223 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
++
++import logging
++
++import pytest
++
++from proxmoxer import ProxmoxAPI
++from proxmoxer.tools import Tasks
++
++from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401
++
++
++class TestBlockingStatus:
++ def test_basic(self, mocked_prox, caplog):
++ caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
++
++ status = Tasks.blocking_status(
++ mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done"
++ )
++
++ assert status == {
++ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++ "starttime": 1661825068,
++ "user": "root@pam",
++ "type": "vzdump",
++ "pstart": 284768076,
++ "status": "stopped",
++ "exitstatus": "OK",
++ "pid": 1044989,
++ "id": "110",
++ "node": "node1",
++ }
++ assert caplog.record_tuples == [
++ (
++ "proxmoxer.core",
++ 20,
++ "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done/status",
++ ),
++ (
++ "proxmoxer.core",
++ 10,
++ 'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1"}}\'',
*** 179 LINES SKIPPED ***