diff --git a/nc_py_api/_exceptions.py b/nc_py_api/_exceptions.py index c0c4551c..3a58814c 100644 --- a/nc_py_api/_exceptions.py +++ b/nc_py_api/_exceptions.py @@ -1,6 +1,6 @@ """Exceptions for the Nextcloud API.""" -from httpx import codes +from httpx import Response, codes class NextcloudException(Exception): @@ -42,21 +42,24 @@ def __init__(self, reason="Missing capability", info: str = ""): super().__init__(412, reason=reason, info=info) -def check_error(code: int, info: str = ""): +def check_error(response: Response, info: str = ""): """Checks HTTP code from Nextcloud, and raises exception in case of error. For the OCS and DAV `code` be code returned by HTTP and not the status from ``ocs_meta``. """ - if 996 <= code <= 999: - if code == 996: + status_code = response.status_code + if not info: + info = f"request: {response.request.method} {response.request.url}" + if 996 <= status_code <= 999: + if status_code == 996: phrase = "Server error" - elif code == 997: + elif status_code == 997: phrase = "Unauthorised" - elif code == 998: + elif status_code == 998: phrase = "Not found" else: phrase = "Unknown error" - raise NextcloudException(code, reason=phrase, info=info) - if not codes.is_error(code): + raise NextcloudException(status_code, reason=phrase, info=info) + if not codes.is_error(status_code): return - raise NextcloudException(code, reason=codes(code).phrase, info=info) + raise NextcloudException(status_code, reason=codes(status_code).phrase, info=info) diff --git a/nc_py_api/_preferences.py b/nc_py_api/_preferences.py index a0777e7c..2c2ff0d7 100644 --- a/nc_py_api/_preferences.py +++ b/nc_py_api/_preferences.py @@ -20,9 +20,9 @@ def available(self) -> bool: def set_value(self, app_name: str, key: str, value: str) -> None: """Sets the value for the key for the specific application.""" require_capabilities("provisioning_api", self._session.capabilities) - self._session.ocs(method="POST", path=f"{self._ep_base}/{app_name}/{key}", params={"configValue": value}) + self._session.ocs("POST", f"{self._ep_base}/{app_name}/{key}", params={"configValue": value}) def delete(self, app_name: str, key: str) -> None: """Removes a key and its value for a specific application.""" require_capabilities("provisioning_api", self._session.capabilities) - self._session.ocs(method="DELETE", path=f"{self._ep_base}/{app_name}/{key}") + self._session.ocs("DELETE", f"{self._ep_base}/{app_name}/{key}") diff --git a/nc_py_api/_preferences_ex.py b/nc_py_api/_preferences_ex.py index 93d0f3d1..f5d0c72d 100644 --- a/nc_py_api/_preferences_ex.py +++ b/nc_py_api/_preferences_ex.py @@ -44,9 +44,7 @@ def get_values(self, keys: list[str]) -> list[CfgRecord]: raise ValueError("`key` parameter can not be empty") require_capabilities("app_api", self._session.capabilities) data = {"configKeys": keys} - results = self._session.ocs( - method="POST", path=f"{self._session.ae_url}/{self._url_suffix}/get-values", json=data - ) + results = self._session.ocs("POST", f"{self._session.ae_url}/{self._url_suffix}/get-values", json=data) return [CfgRecord(i) for i in results] def delete(self, keys: typing.Union[str, list[str]], not_fail=True) -> None: @@ -59,9 +57,7 @@ def delete(self, keys: typing.Union[str, list[str]], not_fail=True) -> None: raise ValueError("`key` parameter can not be empty") require_capabilities("app_api", self._session.capabilities) try: - self._session.ocs( - method="DELETE", path=f"{self._session.ae_url}/{self._url_suffix}", json={"configKeys": keys} - ) + self._session.ocs("DELETE", f"{self._session.ae_url}/{self._url_suffix}", json={"configKeys": keys}) except NextcloudExceptionNotFound as e: if not not_fail: raise e from None @@ -78,7 +74,7 @@ def set_value(self, key: str, value: str) -> None: raise ValueError("`key` parameter can not be empty") require_capabilities("app_api", self._session.capabilities) params = {"configKey": key, "configValue": value} - self._session.ocs(method="POST", path=f"{self._session.ae_url}/{self._url_suffix}", json=params) + self._session.ocs("POST", f"{self._session.ae_url}/{self._url_suffix}", json=params) class AppConfigExAPI(_BasicAppCfgPref): @@ -99,4 +95,4 @@ def set_value(self, key: str, value: str, sensitive: typing.Optional[bool] = Non params: dict = {"configKey": key, "configValue": value} if sensitive is not None: params["sensitive"] = sensitive - self._session.ocs(method="POST", path=f"{self._session.ae_url}/{self._url_suffix}", json=params) + self._session.ocs("POST", f"{self._session.ae_url}/{self._url_suffix}", json=params) diff --git a/nc_py_api/_session.py b/nc_py_api/_session.py index 0b112390..d85e0be4 100644 --- a/nc_py_api/_session.py +++ b/nc_py_api/_session.py @@ -1,20 +1,16 @@ """Session represents one connection to Nextcloud. All related stuff for these live here.""" +import re import typing from abc import ABC, abstractmethod from base64 import b64encode -from collections.abc import Iterator from dataclasses import dataclass from enum import IntEnum -from json import dumps, loads +from json import loads from os import environ -from typing import Optional, TypedDict, Union -from urllib.parse import quote, urlencode -from fastapi import Request -from httpx import Client -from httpx import Headers as HttpxHeaders -from httpx import Limits, ReadTimeout, Response +from fastapi import Request as FastAPIRequest +from httpx import Client, Headers, Limits, ReadTimeout, Request, Response from . import options from ._exceptions import ( @@ -35,7 +31,7 @@ class OCSRespond(IntEnum): RESPOND_UNKNOWN_ERROR = 999 -class ServerVersion(TypedDict): +class ServerVersion(typing.TypedDict): """Nextcloud version information.""" major: int @@ -53,9 +49,9 @@ class ServerVersion(TypedDict): @dataclass class RuntimeOptions: xdebug_session: str - timeout: Optional[int] - timeout_dav: Optional[int] - _nc_cert: Union[str, bool] + timeout: typing.Optional[int] + timeout_dav: typing.Optional[int] + _nc_cert: typing.Union[str, bool] upload_chunk_v2: bool def __init__(self, **kwargs): @@ -66,7 +62,7 @@ def __init__(self, **kwargs): self.upload_chunk_v2 = kwargs.get("chunked_upload_v2", options.CHUNKED_UPLOAD_V2) @property - def nc_cert(self) -> Union[str, bool]: + def nc_cert(self) -> typing.Union[str, bool]: return self._nc_cert @@ -136,7 +132,7 @@ class NcSessionBasic(ABC): adapter_dav: Client cfg: BasicConfig custom_headers: dict - response_headers: HttpxHeaders + response_headers: Headers _user: str _capabilities: dict @@ -148,7 +144,8 @@ def __init__(self, **kwargs): self.limits = Limits(max_keepalive_connections=20, max_connections=20, keepalive_expiry=60.0) self.init_adapter() self.init_adapter_dav() - self.response_headers = HttpxHeaders() + self.response_headers = Headers() + self.__ocs_regexp = re.compile(r"/ocs/v[12]\.php/") def __del__(self): if hasattr(self, "adapter") and self.adapter: @@ -156,86 +153,25 @@ def __del__(self): if hasattr(self, "adapter_dav") and self.adapter_dav: self.adapter_dav.close() - def get_stream(self, path: str, params: Optional[dict] = None, **kwargs) -> Iterator[Response]: - headers = kwargs.pop("headers", {}) - return self._get_stream( - f"{quote(path)}?{urlencode(params, True)}" if params else quote(path), headers=headers, **kwargs - ) - - def _get_stream(self, path_params: str, headers: dict, **kwargs) -> Iterator[Response]: - self.init_adapter() - timeout = kwargs.pop("timeout", self.cfg.options.timeout) - return self.adapter.stream( - "GET", f"{self.cfg.endpoint}{path_params}", headers=headers, timeout=timeout, **kwargs - ) - - def request( - self, - method: str, - path: str, - params: Optional[dict] = None, - data: Optional[Union[bytes, str]] = None, - json: Optional[Union[dict, list]] = None, - **kwargs, - ): - method = method.upper() - if params is None: - params = {} - params.update({"format": "json"}) - headers = kwargs.pop("headers", {}) - data_bytes = self.__data_to_bytes(headers, data, json) - return self._ocs(method, f"{quote(path)}?{urlencode(params, True)}", headers, data_bytes, not_parse=True) - - def request_json( - self, - method: str, - path: str, - params: Optional[dict] = None, - data: Optional[Union[bytes, str]] = None, - json: Optional[Union[dict, list]] = None, - **kwargs, - ) -> dict: - r = self.request(method, path, params, data, json, **kwargs) - return loads(r.text) if r.status_code != 304 else {} - def ocs( self, method: str, path: str, - params: Optional[dict] = None, - data: Optional[Union[bytes, str]] = None, - json: Optional[Union[dict, list]] = None, + *, + content: typing.Optional[typing.Union[bytes, str, typing.Iterable[bytes], typing.AsyncIterable[bytes]]] = None, + json: typing.Optional[typing.Union[dict, list]] = None, + params: typing.Optional[dict] = None, **kwargs, ): - method = method.upper() - if params is None: - params = {} - params.update({"format": "json"}) - headers = kwargs.pop("headers", {}) - data_bytes = self.__data_to_bytes(headers, data, json) - return self._ocs(method, f"{quote(path)}?{urlencode(params, True)}", headers, data=data_bytes, **kwargs) - - def _ocs(self, method: str, path_params: str, headers: dict, data: Optional[bytes], **kwargs): self.init_adapter() - url_params = f"{self.cfg.endpoint}{path_params}" - info = f"request: method={method}, url={url_params}" + info = f"request: {method} {path}" nested_req = kwargs.pop("nested_req", False) - not_parse = kwargs.pop("not_parse", False) try: - timeout = kwargs.pop("timeout", self.cfg.options.timeout) - if method == "GET": - response = self.adapter.get(url_params, headers=headers, timeout=timeout, **kwargs) - else: - response = self.adapter.request( - method, url_params, headers=headers, content=data, timeout=timeout, **kwargs - ) + response = self.adapter.request(method, path, content=content, json=json, params=params, **kwargs) except ReadTimeout: raise NextcloudException(408, info=info) from None - self.response_headers = response.headers - check_error(response.status_code, info) - if not_parse: - return response + check_error(response, info) response_data = loads(response.text) ocs_meta = response_data["ocs"]["meta"] if ocs_meta["status"] != "ok": @@ -246,7 +182,7 @@ def _ocs(self, method: str, path_params: str, headers: dict, data: Optional[byte ): self.adapter.close() self.init_adapter(restart=True) - return self._ocs(method, path_params, headers, data, **kwargs, nested_req=True) + return self.ocs(method, path, **kwargs, content=content, json=json, params=params, nested_req=True) if ocs_meta["statuscode"] in (404, OCSRespond.RESPOND_NOT_FOUND): raise NextcloudExceptionNotFound(reason=ocs_meta["message"], info=info) if ocs_meta["statuscode"] == 304: @@ -254,54 +190,6 @@ def _ocs(self, method: str, path_params: str, headers: dict, data: Optional[byte raise NextcloudException(status_code=ocs_meta["statuscode"], reason=ocs_meta["message"], info=info) return response_data["ocs"]["data"] - def dav( - self, - method: str, - path: str, - data: Optional[Union[str, bytes]] = None, - json: Optional[Union[dict, list]] = None, - **kwargs, - ) -> Response: - headers = kwargs.pop("headers", {}) - data_bytes = self.__data_to_bytes(headers, data, json) - return self._dav( - method, - quote(self.cfg.dav_url_suffix + path) if isinstance(path, str) else path, - headers, - data_bytes, - **kwargs, - ) - - def dav_stream( - self, method: str, path: str, data: Optional[Union[str, bytes]] = None, **kwargs - ) -> Iterator[Response]: - headers = kwargs.pop("headers", {}) - data_bytes = None - if data is not None: - data_bytes = data.encode("UTF-8") if isinstance(data, str) else data - return self._dav_stream(method, quote(self.cfg.dav_url_suffix + path), headers, data_bytes, **kwargs) - - def _dav(self, method: str, path: str, headers: dict, data: Optional[bytes], **kwargs) -> Response: - self.init_adapter_dav() - timeout = kwargs.pop("timeout", self.cfg.options.timeout_dav) - result = self.adapter_dav.request( - method, - self.cfg.endpoint + path if isinstance(path, str) else str(path), - headers=headers, - content=data, - timeout=timeout, - **kwargs, - ) - self.response_headers = result.headers - return result - - def _dav_stream(self, method: str, path: str, headers: dict, data: Optional[bytes], **kwargs) -> Iterator[Response]: - self.init_adapter_dav() - timeout = kwargs.pop("timeout", self.cfg.options.timeout_dav) - return self.adapter_dav.stream( - method, self.cfg.endpoint + path, headers=headers, content=data, timeout=timeout, **kwargs - ) - def init_adapter(self, restart=False) -> None: if getattr(self, "adapter", None) is None or restart: if restart and hasattr(self, "adapter"): @@ -318,24 +206,24 @@ def init_adapter_dav(self, restart=False) -> None: if getattr(self, "adapter_dav", None) is None or restart: if restart and hasattr(self, "adapter"): self.adapter.close() - self.adapter_dav = self._create_adapter() + self.adapter_dav = self._create_adapter(dav=True) if self.custom_headers: self.adapter_dav.headers.update(self.custom_headers) if options.XDEBUG_SESSION: self.adapter_dav.cookies.set("XDEBUG_SESSION", options.XDEBUG_SESSION) @abstractmethod - def _create_adapter(self) -> Client: + def _create_adapter(self, dav: bool = False) -> Client: pass # pragma: no cover def update_server_info(self) -> None: - self._capabilities = self.ocs(method="GET", path="/ocs/v1.php/cloud/capabilities") + self._capabilities = self.ocs("GET", "/ocs/v1.php/cloud/capabilities") @property def user(self) -> str: """Current user ID. Can be different from the login name.""" if isinstance(self, NcSession) and not self._user: # do not trigger for NextcloudApp - self._user = self.ocs(method="GET", path="/ocs/v1.php/cloud/user")["id"] + self._user = self.ocs("GET", "/ocs/v1.php/cloud/user")["id"] return self._user @user.setter @@ -366,16 +254,31 @@ def ae_url(self) -> str: """Return base url for the App Ecosystem endpoints.""" return "/ocs/v1.php/apps/app_api/api/v1" - @staticmethod - def __data_to_bytes( - headers: dict, data: Optional[Union[bytes, str]] = None, json: Optional[Union[dict, list]] = None - ) -> typing.Optional[bytes]: - if data is not None: - return data.encode("UTF-8") if isinstance(data, str) else data - if json is not None: - headers.update({"Content-Type": "application/json"}) - return dumps(json).encode("utf-8") - return None + def _get_adapter_kwargs(self, dav: bool) -> dict[str, typing.Any]: + if dav: + return { + "base_url": self.cfg.dav_endpoint, + "timeout": self.cfg.options.timeout_dav, + "event_hooks": {"request": [], "response": [self._response_event]}, + } + return { + "base_url": self.cfg.endpoint, + "timeout": self.cfg.options.timeout, + "event_hooks": {"request": [self._request_event_ocs], "response": [self._response_event]}, + } + + def _request_event_ocs(self, request: Request) -> None: + str_url = str(request.url) + if re.search(self.__ocs_regexp, str_url) is not None: # this is OCS call + request.url = request.url.copy_merge_params({"format": "json"}) + + def _response_event(self, response: Response) -> None: + str_url = str(response.request.url) + # we do not want ResponseHeaders for those two endpoints, as call to them can occur during DAV calls. + for i in ("/ocs/v1.php/cloud/capabilities?format=json", "/ocs/v1.php/cloud/user?format=json"): + if str_url.endswith(i): + return + self.response_headers = response.headers class NcSession(NcSessionBasic): @@ -385,8 +288,14 @@ def __init__(self, **kwargs): self.cfg = Config(**kwargs) super().__init__() - def _create_adapter(self) -> Client: - return Client(auth=self.cfg.auth, follow_redirects=True, limits=self.limits, verify=self.cfg.options.nc_cert) + def _create_adapter(self, dav: bool = False) -> Client: + return Client( + follow_redirects=True, + limits=self.limits, + verify=self.cfg.options.nc_cert, + **self._get_adapter_kwargs(dav), + auth=self.cfg.auth, + ) class NcSessionApp(NcSessionBasic): @@ -396,35 +305,27 @@ def __init__(self, **kwargs): self.cfg = AppConfig(**kwargs) super().__init__(**kwargs) - def _get_stream(self, path_params: str, headers: dict, **kwargs) -> Iterator[Response]: - self.sign_request(headers) - return super()._get_stream(path_params, headers, **kwargs) - - def _ocs(self, method: str, path_params: str, headers: dict, data: Optional[bytes], **kwargs): - self.sign_request(headers) - return super()._ocs(method, path_params, headers, data, **kwargs) - - def _dav(self, method: str, path: str, headers: dict, data: Optional[bytes], **kwargs) -> Response: - self.sign_request(headers) - return super()._dav(method, path, headers, data, **kwargs) - - def _dav_stream(self, method: str, path: str, headers: dict, data: Optional[bytes], **kwargs) -> Iterator[Response]: - self.sign_request(headers) - return super()._dav_stream(method, path, headers, data, **kwargs) + def _create_adapter(self, dav: bool = False) -> Client: + r = self._get_adapter_kwargs(dav) + r["event_hooks"]["request"].append(self._add_auth) + return Client( + follow_redirects=True, + limits=self.limits, + verify=self.cfg.options.nc_cert, + **r, + headers={ + "AA-VERSION": self.cfg.aa_version, + "EX-APP-ID": self.cfg.app_name, + "EX-APP-VERSION": self.cfg.app_version, + }, + ) - def _create_adapter(self) -> Client: - adapter = Client(follow_redirects=True, limits=self.limits, verify=self.cfg.options.nc_cert) - adapter.headers.update({ - "AA-VERSION": self.cfg.aa_version, - "EX-APP-ID": self.cfg.app_name, - "EX-APP-VERSION": self.cfg.app_version, + def _add_auth(self, request: Request): + request.headers.update({ + "AUTHORIZATION-APP-API": b64encode(f"{self._user}:{self.cfg.app_secret}".encode("UTF=8")) }) - return adapter - - def sign_request(self, headers: dict) -> None: - headers.update({"AUTHORIZATION-APP-API": b64encode(f"{self._user}:{self.cfg.app_secret}".encode("UTF=8"))}) - def sign_check(self, request: Request) -> None: + def sign_check(self, request: FastAPIRequest) -> None: headers = { "AA-VERSION": request.headers.get("AA-VERSION", ""), "EX-APP-ID": request.headers.get("EX-APP-ID", ""), diff --git a/nc_py_api/_talk_api.py b/nc_py_api/_talk_api.py index 1a80a9f1..c8830e7e 100644 --- a/nc_py_api/_talk_api.py +++ b/nc_py_api/_talk_api.py @@ -3,6 +3,7 @@ import hashlib import typing +from ._exceptions import check_error from ._misc import ( check_capabilities, clear_from_params_empty, @@ -166,7 +167,7 @@ def set_conversation_password(self, conversation: typing.Union[Conversation, str .. note:: Password should match the password policy. """ token = conversation.token if isinstance(conversation, Conversation) else conversation - self._session.ocs("PUT", self._ep_base + f"/api/v4/room/{token}/password", {"password": password}) + self._session.ocs("PUT", self._ep_base + f"/api/v4/room/{token}/password", params={"password": password}) def set_conversation_readonly(self, conversation: typing.Union[Conversation, str], read_only: bool) -> None: """Changes conversation **read_only** state. @@ -175,7 +176,7 @@ def set_conversation_readonly(self, conversation: typing.Union[Conversation, str :param read_only: value to set for the ``read_only`` state. """ token = conversation.token if isinstance(conversation, Conversation) else conversation - self._session.ocs("PUT", self._ep_base + f"/api/v4/room/{token}/read-only", {"state": int(read_only)}) + self._session.ocs("PUT", self._ep_base + f"/api/v4/room/{token}/read-only", params={"state": int(read_only)}) def set_conversation_public(self, conversation: typing.Union[Conversation, str], public: bool) -> None: """Changes conversation **public** state. @@ -195,7 +196,7 @@ def set_conversation_notify_lvl( :param new_lvl: new value for notification level for the current user. """ token = conversation.token if isinstance(conversation, Conversation) else conversation - self._session.ocs("POST", self._ep_base + f"/api/v4/room/{token}/notify", {"level": int(new_lvl)}) + self._session.ocs("POST", self._ep_base + f"/api/v4/room/{token}/notify", json={"level": int(new_lvl)}) def get_conversation_by_token(self, conversation: typing.Union[Conversation, str]) -> Conversation: """Gets conversation by token.""" @@ -543,7 +544,8 @@ def get_conversation_avatar(self, conversation: typing.Union[Conversation, str], require_capabilities("spreed.features.avatar", self._session.capabilities) token = conversation.token if isinstance(conversation, Conversation) else conversation ep_suffix = "/dark" if dark else "" - response = self._session.ocs("GET", self._ep_base + f"/api/v1/room/{token}/avatar" + ep_suffix, not_parse=True) + response = self._session.adapter.get(self._ep_base + f"/api/v1/room/{token}/avatar" + ep_suffix) + check_error(response) return response.content @staticmethod diff --git a/nc_py_api/activity.py b/nc_py_api/activity.py index 37c5c1dd..0f646db6 100644 --- a/nc_py_api/activity.py +++ b/nc_py_api/activity.py @@ -112,7 +112,7 @@ def objects(self) -> dict: .. note:: They are stored in objects as key-value pairs of the object_id and the object_name: { object_id: object_name} """ - return self._raw_data["objects"] + return self._raw_data["objects"] if isinstance(self._raw_data["objects"], dict) else {} @property def link(self) -> str: diff --git a/nc_py_api/apps.py b/nc_py_api/apps.py index 9f41e724..f38cf940 100644 --- a/nc_py_api/apps.py +++ b/nc_py_api/apps.py @@ -66,7 +66,7 @@ def disable(self, app_id: str) -> None: """ if not app_id: raise ValueError("`app_id` parameter can not be empty") - self._session.ocs(method="DELETE", path=f"{self._ep_base}/{app_id}") + self._session.ocs("DELETE", f"{self._ep_base}/{app_id}") def enable(self, app_id: str) -> None: """Enables the application. @@ -75,7 +75,7 @@ def enable(self, app_id: str) -> None: """ if not app_id: raise ValueError("`app_id` parameter can not be empty") - self._session.ocs(method="POST", path=f"{self._ep_base}/{app_id}") + self._session.ocs("POST", f"{self._ep_base}/{app_id}") def get_list(self, enabled: typing.Optional[bool] = None) -> list[str]: """Get the list of installed applications. @@ -85,7 +85,7 @@ def get_list(self, enabled: typing.Optional[bool] = None) -> list[str]: params = None if enabled is not None: params = {"filter": "enabled" if enabled else "disabled"} - result = self._session.ocs(method="GET", path=self._ep_base, params=params) + result = self._session.ocs("GET", self._ep_base, params=params) return list(result["apps"].values()) if isinstance(result["apps"], dict) else result["apps"] def is_installed(self, app_id: str) -> bool: @@ -113,7 +113,7 @@ def ex_app_disable(self, app_id: str) -> None: """ if not app_id: raise ValueError("`app_id` parameter can not be empty") - self._session.ocs(method="PUT", path=f"{self._session.ae_url}/ex-app/{app_id}/enabled", json={"enabled": 0}) + self._session.ocs("PUT", f"{self._session.ae_url}/ex-app/{app_id}/enabled", json={"enabled": 0}) def ex_app_enable(self, app_id: str) -> None: """Enables the external application. @@ -122,7 +122,7 @@ def ex_app_enable(self, app_id: str) -> None: """ if not app_id: raise ValueError("`app_id` parameter can not be empty") - self._session.ocs(method="PUT", path=f"{self._session.ae_url}/ex-app/{app_id}/enabled", json={"enabled": 1}) + self._session.ocs("PUT", f"{self._session.ae_url}/ex-app/{app_id}/enabled", json={"enabled": 1}) def ex_app_get_list(self, enabled: bool = False) -> list[ExAppInfo]: """Gets information of the enabled external applications installed on the server. @@ -132,7 +132,7 @@ def ex_app_get_list(self, enabled: bool = False) -> list[ExAppInfo]: """ require_capabilities("app_api", self._session.capabilities) url_param = "enabled" if enabled else "all" - r = self._session.ocs(method="GET", path=f"{self._session.ae_url}/ex-app/{url_param}") + r = self._session.ocs("GET", f"{self._session.ae_url}/ex-app/{url_param}") return [ExAppInfo(i) for i in r] def ex_app_is_enabled(self, app_id: str) -> bool: diff --git a/nc_py_api/calendar.py b/nc_py_api/calendar.py index d789869d..0832591b 100644 --- a/nc_py_api/calendar.py +++ b/nc_py_api/calendar.py @@ -22,7 +22,9 @@ def request(self, url, method="GET", body="", headers={}): # noqa pylint: disab body = body.encode("UTF-8") if body: body = body.replace(b"\n", b"\r\n").replace(b"\r\r\n", b"\r\n") - r = self._session.dav(method, url, data=body, headers=headers) + r = self._session.adapter_dav.request( + method, url if isinstance(url, str) else str(url), content=body, headers=headers + ) return DAVResponse(r) except ImportError: diff --git a/nc_py_api/ex_app/ui/files_actions.py b/nc_py_api/ex_app/ui/files_actions.py index 40d4d595..cf1475cd 100644 --- a/nc_py_api/ex_app/ui/files_actions.py +++ b/nc_py_api/ex_app/ui/files_actions.py @@ -139,13 +139,13 @@ def register(self, name: str, display_name: str, callback_url: str, **kwargs) -> "permissions": kwargs.get("permissions", 31), "order": kwargs.get("order", 0), } - self._session.ocs(method="POST", path=f"{self._session.ae_url}/{self._ep_suffix}", json=params) + self._session.ocs("POST", f"{self._session.ae_url}/{self._ep_suffix}", json=params) def unregister(self, name: str, not_fail=True) -> None: """Removes files dropdown menu element.""" require_capabilities("app_api", self._session.capabilities) try: - self._session.ocs(method="DELETE", path=f"{self._session.ae_url}/{self._ep_suffix}", json={"name": name}) + self._session.ocs("DELETE", f"{self._session.ae_url}/{self._ep_suffix}", json={"name": name}) except NextcloudExceptionNotFound as e: if not not_fail: raise e from None @@ -155,7 +155,7 @@ def get_entry(self, name: str) -> typing.Optional[UiFileActionEntry]: require_capabilities("app_api", self._session.capabilities) try: return UiFileActionEntry( - self._session.ocs(method="GET", path=f"{self._session.ae_url}/{self._ep_suffix}", params={"name": name}) + self._session.ocs("GET", f"{self._session.ae_url}/{self._ep_suffix}", params={"name": name}) ) except NextcloudExceptionNotFound: return None diff --git a/nc_py_api/ex_app/ui/resources.py b/nc_py_api/ex_app/ui/resources.py index 3c7714d3..aae0df7e 100644 --- a/nc_py_api/ex_app/ui/resources.py +++ b/nc_py_api/ex_app/ui/resources.py @@ -96,15 +96,15 @@ def set_initial_state(self, ui_type: str, name: str, key: str, value: typing.Uni "key": key, "value": value, } - self._session.ocs(method="POST", path=f"{self._session.ae_url}/{self._ep_suffix_init_state}", json=params) + self._session.ocs("POST", f"{self._session.ae_url}/{self._ep_suffix_init_state}", json=params) def delete_initial_state(self, ui_type: str, name: str, key: str, not_fail=True) -> None: """Removes initial state for the page(template) by object name.""" require_capabilities("app_api", self._session.capabilities) try: self._session.ocs( - method="DELETE", - path=f"{self._session.ae_url}/{self._ep_suffix_init_state}", + "DELETE", + f"{self._session.ae_url}/{self._ep_suffix_init_state}", params={"type": ui_type, "name": name, "key": key}, ) except NextcloudExceptionNotFound as e: @@ -117,8 +117,8 @@ def get_initial_state(self, ui_type: str, name: str, key: str) -> typing.Optiona try: return UiInitState( self._session.ocs( - method="GET", - path=f"{self._session.ae_url}/{self._ep_suffix_init_state}", + "GET", + f"{self._session.ae_url}/{self._ep_suffix_init_state}", params={"type": ui_type, "name": name, "key": key}, ) ) @@ -134,15 +134,15 @@ def set_script(self, ui_type: str, name: str, path: str, after_app_id: str = "") "path": path, "afterAppId": after_app_id, } - self._session.ocs(method="POST", path=f"{self._session.ae_url}/{self._ep_suffix_js}", json=params) + self._session.ocs("POST", f"{self._session.ae_url}/{self._ep_suffix_js}", json=params) def delete_script(self, ui_type: str, name: str, path: str, not_fail=True) -> None: """Removes script for the page(template) by object name.""" require_capabilities("app_api", self._session.capabilities) try: self._session.ocs( - method="DELETE", - path=f"{self._session.ae_url}/{self._ep_suffix_js}", + "DELETE", + f"{self._session.ae_url}/{self._ep_suffix_js}", params={"type": ui_type, "name": name, "path": path}, ) except NextcloudExceptionNotFound as e: @@ -155,8 +155,8 @@ def get_script(self, ui_type: str, name: str, path: str) -> typing.Optional[UiSc try: return UiScript( self._session.ocs( - method="GET", - path=f"{self._session.ae_url}/{self._ep_suffix_js}", + "GET", + f"{self._session.ae_url}/{self._ep_suffix_js}", params={"type": ui_type, "name": name, "path": path}, ) ) @@ -171,15 +171,15 @@ def set_style(self, ui_type: str, name: str, path: str) -> None: "name": name, "path": path, } - self._session.ocs(method="POST", path=f"{self._session.ae_url}/{self._ep_suffix_css}", json=params) + self._session.ocs("POST", f"{self._session.ae_url}/{self._ep_suffix_css}", json=params) def delete_style(self, ui_type: str, name: str, path: str, not_fail=True) -> None: """Removes style(css) for the page(template) by object name.""" require_capabilities("app_api", self._session.capabilities) try: self._session.ocs( - method="DELETE", - path=f"{self._session.ae_url}/{self._ep_suffix_css}", + "DELETE", + f"{self._session.ae_url}/{self._ep_suffix_css}", params={"type": ui_type, "name": name, "path": path}, ) except NextcloudExceptionNotFound as e: @@ -192,8 +192,8 @@ def get_style(self, ui_type: str, name: str, path: str) -> typing.Optional[UiSty try: return UiStyle( self._session.ocs( - method="GET", - path=f"{self._session.ae_url}/{self._ep_suffix_css}", + "GET", + f"{self._session.ae_url}/{self._ep_suffix_css}", params={"type": ui_type, "name": name, "path": path}, ) ) diff --git a/nc_py_api/ex_app/ui/top_menu.py b/nc_py_api/ex_app/ui/top_menu.py index f9805745..1b15ccce 100644 --- a/nc_py_api/ex_app/ui/top_menu.py +++ b/nc_py_api/ex_app/ui/top_menu.py @@ -67,13 +67,13 @@ def register(self, name: str, display_name: str, icon: str = "", admin_required= "icon": icon, "adminRequired": int(admin_required), } - self._session.ocs(method="POST", path=f"{self._session.ae_url}/{self._ep_suffix}", json=params) + self._session.ocs("POST", f"{self._session.ae_url}/{self._ep_suffix}", json=params) def unregister(self, name: str, not_fail=True) -> None: """Removes App entry in Top Menu.""" require_capabilities("app_api", self._session.capabilities) try: - self._session.ocs(method="DELETE", path=f"{self._session.ae_url}/{self._ep_suffix}", params={"name": name}) + self._session.ocs("DELETE", f"{self._session.ae_url}/{self._ep_suffix}", params={"name": name}) except NextcloudExceptionNotFound as e: if not not_fail: raise e from None @@ -83,7 +83,7 @@ def get_entry(self, name: str) -> typing.Optional[UiTopMenuEntry]: require_capabilities("app_api", self._session.capabilities) try: return UiTopMenuEntry( - self._session.ocs(method="GET", path=f"{self._session.ae_url}/{self._ep_suffix}", params={"name": name}) + self._session.ocs("GET", f"{self._session.ae_url}/{self._ep_suffix}", params={"name": name}) ) except NextcloudExceptionNotFound: return None diff --git a/nc_py_api/files/files.py b/nc_py_api/files/files.py index 2450923c..25f207eb 100644 --- a/nc_py_api/files/files.py +++ b/nc_py_api/files/files.py @@ -126,7 +126,9 @@ def find(self, req: list, path: Union[str, FsNode] = "") -> list[FsNode]: self._build_search_req(xml_where, req) headers = {"Content-Type": "text/xml"} - webdav_response = self._session.dav("SEARCH", "", data=self._element_tree_as_str(root), headers=headers) + webdav_response = self._session.adapter_dav.request( + "SEARCH", "", content=self._element_tree_as_str(root), headers=headers + ) request_info = f"find: {self._session.user}, {req}, {path}" return self._lf_parse_webdav_response(webdav_response, request_info) @@ -136,8 +138,8 @@ def download(self, path: Union[str, FsNode]) -> bytes: :param path: path to download file. """ path = path.user_path if isinstance(path, FsNode) else path - response = self._session.dav("GET", self._dav_get_obj_path(self._session.user, path)) - check_error(response.status_code, f"download: user={self._session.user}, path={path}") + response = self._session.adapter_dav.get(self._dav_get_obj_path(self._session.user, path)) + check_error(response, f"download: user={self._session.user}, path={path}") return response.content def download2stream(self, path: Union[str, FsNode], fp, **kwargs) -> None: @@ -169,11 +171,10 @@ def download_directory_as_zip( .. note:: This works only for directories, you should not use this to download a file. """ path = path.user_path if isinstance(path, FsNode) else path - with self._session.get_stream( - "/index.php/apps/files/ajax/download.php", params={"dir": path} - ) as response: # type: ignore - self._session.response_headers = response.headers - check_error(response.status_code, f"download_directory_as_zip: user={self._session.user}, path={path}") + with self._session.adapter.stream( + "GET", "/index.php/apps/files/ajax/download.php", params={"dir": path} + ) as response: + check_error(response, f"download_directory_as_zip: user={self._session.user}, path={path}") result_path = local_path if local_path else os.path.basename(path) with open( result_path, @@ -191,8 +192,8 @@ def upload(self, path: Union[str, FsNode], content: Union[bytes, str]) -> FsNode """ path = path.user_path if isinstance(path, FsNode) else path full_path = self._dav_get_obj_path(self._session.user, path) - response = self._session.dav("PUT", full_path, data=content) - check_error(response.status_code, f"upload: user={self._session.user}, path={path}, size={len(content)}") + response = self._session.adapter_dav.put(full_path, content=content) + check_error(response, f"upload: user={self._session.user}, path={path}, size={len(content)}") return FsNode(full_path.strip("/"), **self.__get_etag_fileid_from_response(response)) def upload_stream(self, path: Union[str, FsNode], fp, **kwargs) -> FsNode: @@ -220,8 +221,8 @@ def mkdir(self, path: Union[str, FsNode]) -> FsNode: """ path = path.user_path if isinstance(path, FsNode) else path full_path = self._dav_get_obj_path(self._session.user, path) - response = self._session.dav("MKCOL", full_path) - check_error(response.status_code, f"mkdir: user={self._session.user}, path={path}") + response = self._session.adapter_dav.request("MKCOL", full_path) + check_error(response) full_path += "/" if not full_path.endswith("/") else "" return FsNode(full_path.lstrip("/"), **self.__get_etag_fileid_from_response(response)) @@ -255,10 +256,10 @@ def delete(self, path: Union[str, FsNode], not_fail=False) -> None: :param not_fail: if set to ``True`` and the object is not found, it does not raise an exception. """ path = path.user_path if isinstance(path, FsNode) else path - response = self._session.dav("DELETE", self._dav_get_obj_path(self._session.user, path)) + response = self._session.adapter_dav.delete(self._dav_get_obj_path(self._session.user, path)) if response.status_code == 404 and not_fail: return - check_error(response.status_code, f"delete: user={self._session.user}, path={path}") + check_error(response) def move(self, path_src: Union[str, FsNode], path_dest: Union[str, FsNode], overwrite=False) -> FsNode: """Moves an existing file or a directory. @@ -274,12 +275,12 @@ def move(self, path_src: Union[str, FsNode], path_dest: Union[str, FsNode], over ) dest = self._session.cfg.dav_endpoint + full_dest_path headers = Headers({"Destination": dest, "Overwrite": "T" if overwrite else "F"}, encoding="utf-8") - response = self._session.dav( + response = self._session.adapter_dav.request( "MOVE", self._dav_get_obj_path(self._session.user, path_src), headers=headers, ) - check_error(response.status_code, f"move: user={self._session.user}, src={path_src}, dest={dest}, {overwrite}") + check_error(response, f"move: user={self._session.user}, src={path_src}, dest={dest}, {overwrite}") return self.find(req=["eq", "fileid", response.headers["OC-FileId"]])[0] def copy(self, path_src: Union[str, FsNode], path_dest: Union[str, FsNode], overwrite=False) -> FsNode: @@ -296,12 +297,12 @@ def copy(self, path_src: Union[str, FsNode], path_dest: Union[str, FsNode], over ) dest = self._session.cfg.dav_endpoint + full_dest_path headers = Headers({"Destination": dest, "Overwrite": "T" if overwrite else "F"}, encoding="utf-8") - response = self._session.dav( + response = self._session.adapter_dav.request( "COPY", self._dav_get_obj_path(self._session.user, path_src), headers=headers, ) - check_error(response.status_code, f"copy: user={self._session.user}, src={path_src}, dest={dest}, {overwrite}") + check_error(response, f"copy: user={self._session.user}, src={path_src}, dest={dest}, {overwrite}") return self.find(req=["eq", "fileid", response.headers["OC-FileId"]])[0] def list_by_criteria( @@ -329,11 +330,11 @@ def list_by_criteria( for v in tags: tag_id = v.tag_id if isinstance(v, SystemTag) else v ElementTree.SubElement(xml_filter_rules, "oc:systemtag").text = str(tag_id) - webdav_response = self._session.dav( - "REPORT", self._dav_get_obj_path(self._session.user), data=self._element_tree_as_str(root) + webdav_response = self._session.adapter_dav.request( + "REPORT", self._dav_get_obj_path(self._session.user), content=self._element_tree_as_str(root) ) request_info = f"list_files_by_criteria: {self._session.user}" - check_error(webdav_response.status_code, request_info) + check_error(webdav_response, request_info) return self._lf_parse_webdav_response(webdav_response, request_info) def setfav(self, path: Union[str, FsNode], value: Union[int, bool]) -> None: @@ -350,10 +351,10 @@ def setfav(self, path: Union[str, FsNode], value: Union[int, bool]) -> None: xml_set = ElementTree.SubElement(root, "d:set") xml_set_prop = ElementTree.SubElement(xml_set, "d:prop") ElementTree.SubElement(xml_set_prop, "oc:favorite").text = str(int(bool(value))) - webdav_response = self._session.dav( - "PROPPATCH", self._dav_get_obj_path(self._session.user, path), data=self._element_tree_as_str(root) + webdav_response = self._session.adapter_dav.request( + "PROPPATCH", self._dav_get_obj_path(self._session.user, path), content=self._element_tree_as_str(root) ) - check_error(webdav_response.status_code, f"setfav: path={path}, value={value}") + check_error(webdav_response, f"setfav: path={path}, value={value}") def trashbin_list(self) -> list[FsNode]: """Returns a list of all entries in the TrashBin.""" @@ -373,12 +374,12 @@ def trashbin_restore(self, path: Union[str, FsNode]) -> None: dest = self._session.cfg.dav_endpoint + f"/trashbin/{self._session.user}/restore/{restore_name}" headers = Headers({"Destination": dest}, encoding="utf-8") - response = self._session.dav( + response = self._session.adapter_dav.request( "MOVE", - path=f"/trashbin/{self._session.user}/{path}", + f"/trashbin/{self._session.user}/{path}", headers=headers, ) - check_error(response.status_code, f"trashbin_restore: user={self._session.user}, src={path}, dest={dest}") + check_error(response, f"trashbin_restore: user={self._session.user}, src={path}, dest={dest}") def trashbin_delete(self, path: Union[str, FsNode], not_fail=False) -> None: """Deletes a file/directory permanently from the TrashBin. @@ -387,15 +388,14 @@ def trashbin_delete(self, path: Union[str, FsNode], not_fail=False) -> None: :param not_fail: if set to ``True`` and the object is not found, it does not raise an exception. """ path = path.user_path if isinstance(path, FsNode) else path - response = self._session.dav(method="DELETE", path=f"/trashbin/{self._session.user}/{path}") + response = self._session.adapter_dav.delete(f"/trashbin/{self._session.user}/{path}") if response.status_code == 404 and not_fail: return - check_error(response.status_code, f"delete_from_trashbin: user={self._session.user}, path={path}") + check_error(response) def trashbin_cleanup(self) -> None: """Empties the TrashBin.""" - response = self._session.dav(method="DELETE", path=f"/trashbin/{self._session.user}/trash") - check_error(response.status_code, f"trashbin_cleanup: user={self._session.user}") + check_error(self._session.adapter_dav.delete(f"/trashbin/{self._session.user}/trash")) def get_versions(self, file_object: FsNode) -> list[FsNode]: """Returns a list of all file versions if any.""" @@ -417,12 +417,12 @@ def restore_version(self, file_object: FsNode) -> None: require_capabilities("files.versioning", self._session.capabilities) dest = self._session.cfg.dav_endpoint + f"/versions/{self._session.user}/restore/{file_object.name}" headers = Headers({"Destination": dest}, encoding="utf-8") - response = self._session.dav( + response = self._session.adapter_dav.request( "MOVE", - path=f"/versions/{self._session.user}/{file_object.user_path}", + f"/versions/{self._session.user}/{file_object.user_path}", headers=headers, ) - check_error(response.status_code, f"restore_version: user={self._session.user}, src={file_object.user_path}") + check_error(response, f"restore_version: user={self._session.user}, src={file_object.user_path}") def list_tags(self) -> list[SystemTag]: """Returns list of the avalaible Tags.""" @@ -434,7 +434,7 @@ def list_tags(self) -> list[SystemTag]: prop_element = ElementTree.SubElement(root, "d:prop") for i in properties: ElementTree.SubElement(prop_element, i) - response = self._session.dav("PROPFIND", "/systemtags", self._element_tree_as_str(root)) + response = self._session.adapter_dav.request("PROPFIND", "/systemtags", content=self._element_tree_as_str(root)) result = [] records = self._webdav_response_to_records(response, "list_tags") for record in records: @@ -451,16 +451,15 @@ def create_tag(self, name: str, user_visible: bool = True, user_assignable: bool :param user_visible: Should be Tag visible in the UI. :param user_assignable: Can Tag be assigned from the UI. """ - response = self._session.dav( - "POST", - path="/systemtags", + response = self._session.adapter_dav.post( + "/systemtags", json={ "name": name, "userVisible": user_visible, "userAssignable": user_assignable, }, ) - check_error(response.status_code, info=f"create_tag({name})") + check_error(response, info=f"create_tag({name})") def update_tag( self, @@ -490,14 +489,16 @@ def update_tag( prop_element = ElementTree.SubElement(xml_set, "d:prop") for k, v in properties.items(): ElementTree.SubElement(prop_element, k).text = v - response = self._session.dav("PROPPATCH", f"/systemtags/{tag_id}", self._element_tree_as_str(root)) - check_error(response.status_code, info=f"update_tag({tag_id})") + response = self._session.adapter_dav.request( + "PROPPATCH", f"/systemtags/{tag_id}", content=self._element_tree_as_str(root) + ) + check_error(response) def delete_tag(self, tag_id: Union[int, SystemTag]) -> None: """Deletes the tag.""" tag_id = tag_id.tag_id if isinstance(tag_id, SystemTag) else tag_id - response = self._session.dav("DELETE", f"/systemtags/{tag_id}") - check_error(response.status_code, info=f"delete_tag({tag_id})") + response = self._session.adapter_dav.delete(f"/systemtags/{tag_id}") + check_error(response) def tag_by_name(self, tag_name: str) -> SystemTag: """Returns Tag info by its name if found or ``None`` otherwise.""" @@ -517,12 +518,13 @@ def unassign_tag(self, file_id: Union[FsNode, int], tag_id: Union[SystemTag, int def _file_change_tag_state( self, file_id: Union[FsNode, int], tag_id: Union[SystemTag, int], tag_state: bool ) -> None: - request = "PUT" if tag_state else "DELETE" fs_object = file_id.info.fileid if isinstance(file_id, FsNode) else file_id tag = tag_id.tag_id if isinstance(tag_id, SystemTag) else tag_id - response = self._session.dav(request, f"/systemtags-relations/files/{fs_object}/{tag}") + response = self._session.adapter_dav.request( + "PUT" if tag_state else "DELETE", f"/systemtags-relations/files/{fs_object}/{tag}" + ) check_error( - response.status_code, + response, info=f"({'Adding' if tag_state else 'Removing'} `{tag}` {'to' if tag_state else 'from'} {fs_object})", ) @@ -548,10 +550,10 @@ def _listdir( dav_path = self._dav_get_obj_path(f"trashbin/{user}/trash", path, root_path="") else: dav_path = self._dav_get_obj_path(user, path) - webdav_response = self._session.dav( + webdav_response = self._session.adapter_dav.request( "PROPFIND", dav_path, - self._element_tree_as_str(root), + content=self._element_tree_as_str(root), headers={"Depth": "infinity" if depth == -1 else str(depth)}, ) @@ -631,7 +633,7 @@ def _lf_parse_webdav_response( @staticmethod def _webdav_response_to_records(webdav_res: Response, info: str) -> list[dict]: - check_error(webdav_res.status_code, info=info) + check_error(webdav_res, info=info) if webdav_res.status_code != 207: # multistatus raise NextcloudException(webdav_res.status_code, "Response is not a multistatus.", info=info) response_data = loads(dumps(xmltodict.parse(webdav_res.text))) @@ -684,11 +686,8 @@ def _add_value(xml_element, val=None) -> None: _add_value(xml_element_where, where_part) def __download2stream(self, path: str, fp, **kwargs) -> None: - with self._session.dav_stream( - "GET", self._dav_get_obj_path(self._session.user, path) - ) as response: # type: ignore - self._session.response_headers = response.headers - check_error(response.status_code, f"download_stream: user={self._session.user}, path={path}") + with self._session.adapter_dav.stream("GET", self._dav_get_obj_path(self._session.user, path)) as response: + check_error(response, f"download_stream: user={self._session.user}, path={path}") for data_chunk in response.iter_raw(chunk_size=kwargs.get("chunk_size", 5 * 1024 * 1024)): fp.write(data_chunk) @@ -698,10 +697,10 @@ def __upload_stream(self, path: str, fp, chunk_size: int) -> FsNode: full_path = self._dav_get_obj_path(self._session.user, path) headers = Headers({"Destination": self._session.cfg.dav_endpoint + full_path}, encoding="utf-8") if _v2: - response = self._session.dav("MKCOL", _dav_path, headers=headers) + response = self._session.adapter_dav.request("MKCOL", _dav_path, headers=headers) else: - response = self._session.dav("MKCOL", _dav_path) - check_error(response.status_code) + response = self._session.adapter_dav.request("MKCOL", _dav_path) + check_error(response) try: start_bytes = end_bytes = chunk_number = 0 while True: @@ -710,31 +709,31 @@ def __upload_stream(self, path: str, fp, chunk_size: int) -> FsNode: break end_bytes = start_bytes + len(piece) if _v2: - response = self._session.dav( - "PUT", _dav_path + "/" + str(chunk_number), data=piece, headers=headers + response = self._session.adapter_dav.put( + _dav_path + "/" + str(chunk_number), content=piece, headers=headers ) else: _filename = str(start_bytes).rjust(15, "0") + "-" + str(end_bytes).rjust(15, "0") - response = self._session.dav("PUT", _dav_path + "/" + _filename, data=piece) + response = self._session.adapter_dav.put(_dav_path + "/" + _filename, content=piece) check_error( - response.status_code, + response, f"upload_stream(v={_v2}): user={self._session.user}, path={path}, cur_size={end_bytes}", ) start_bytes = end_bytes chunk_number += 1 - response = self._session.dav( + response = self._session.adapter_dav.request( "MOVE", _dav_path + "/.file", headers=headers, ) check_error( - response.status_code, + response, f"upload_stream(v={_v2}): user={self._session.user}, path={path}, total_size={end_bytes}", ) return FsNode(full_path.strip("/"), **self.__get_etag_fileid_from_response(response)) finally: - self._session.dav("DELETE", _dav_path) + self._session.adapter_dav.delete(_dav_path) @staticmethod def __get_etag_fileid_from_response(response: Response) -> dict: diff --git a/nc_py_api/files/sharing.py b/nc_py_api/files/sharing.py index 490f9959..6be0fefb 100644 --- a/nc_py_api/files/sharing.py +++ b/nc_py_api/files/sharing.py @@ -38,19 +38,19 @@ def get_list( } if path: params["path"] = path - result = self._session.ocs(method="GET", path=f"{self._ep_base}/shares", params=params) + result = self._session.ocs("GET", f"{self._ep_base}/shares", params=params) return [Share(i) for i in result] def get_by_id(self, share_id: int) -> Share: """Get Share by share ID.""" _misc.require_capabilities("files_sharing.api_enabled", self._session.capabilities) - result = self._session.ocs(method="GET", path=f"{self._ep_base}/shares/{share_id}") + result = self._session.ocs("GET", f"{self._ep_base}/shares/{share_id}") return Share(result[0] if isinstance(result, list) else result) def get_inherited(self, path: str) -> list[Share]: """Get all shares relative to a file, e.g., parent folders shares.""" _misc.require_capabilities("files_sharing.api_enabled", self._session.capabilities) - result = self._session.ocs(method="GET", path=f"{self._ep_base}/shares/inherited", params={"path": path}) + result = self._session.ocs("GET", f"{self._ep_base}/shares/inherited", params={"path": path}) return [Share(i) for i in result] def create( @@ -102,7 +102,7 @@ def create( params["note"] = kwargs["note"] if "label" in kwargs: params["label"] = kwargs["label"] - return Share(self._session.ocs(method="POST", path=f"{self._ep_base}/shares", params=params)) + return Share(self._session.ocs("POST", f"{self._ep_base}/shares", params=params)) def update(self, share_id: typing.Union[int, Share], **kwargs) -> Share: """Updates the share options. @@ -128,7 +128,7 @@ def update(self, share_id: typing.Union[int, Share], **kwargs) -> Share: params["note"] = kwargs["note"] if "label" in kwargs: params["label"] = kwargs["label"] - return Share(self._session.ocs(method="PUT", path=f"{self._ep_base}/shares/{share_id}", params=params)) + return Share(self._session.ocs("PUT", f"{self._ep_base}/shares/{share_id}", params=params)) def delete(self, share_id: typing.Union[int, Share]) -> None: """Removes the given share. @@ -137,31 +137,31 @@ def delete(self, share_id: typing.Union[int, Share]) -> None: """ _misc.require_capabilities("files_sharing.api_enabled", self._session.capabilities) share_id = share_id.share_id if isinstance(share_id, Share) else share_id - self._session.ocs(method="DELETE", path=f"{self._ep_base}/shares/{share_id}") + self._session.ocs("DELETE", f"{self._ep_base}/shares/{share_id}") def get_pending(self) -> list[Share]: """Returns all pending shares for current user.""" - return [Share(i) for i in self._session.ocs(method="GET", path=f"{self._ep_base}/shares/pending")] + return [Share(i) for i in self._session.ocs("GET", f"{self._ep_base}/shares/pending")] def accept_share(self, share_id: typing.Union[int, Share]) -> None: """Accept pending share.""" _misc.require_capabilities("files_sharing.api_enabled", self._session.capabilities) share_id = share_id.share_id if isinstance(share_id, Share) else share_id - self._session.ocs(method="POST", path=f"{self._ep_base}/pending/{share_id}") + self._session.ocs("POST", f"{self._ep_base}/pending/{share_id}") def decline_share(self, share_id: typing.Union[int, Share]) -> None: """Decline pending share.""" _misc.require_capabilities("files_sharing.api_enabled", self._session.capabilities) share_id = share_id.share_id if isinstance(share_id, Share) else share_id - self._session.ocs(method="DELETE", path=f"{self._ep_base}/pending/{share_id}") + self._session.ocs("DELETE", f"{self._ep_base}/pending/{share_id}") def get_deleted(self) -> list[Share]: """Get a list of deleted shares.""" _misc.require_capabilities("files_sharing.api_enabled", self._session.capabilities) - return [Share(i) for i in self._session.ocs(method="GET", path=f"{self._ep_base}/deletedshares")] + return [Share(i) for i in self._session.ocs("GET", f"{self._ep_base}/deletedshares")] def undelete(self, share_id: typing.Union[int, Share]) -> None: """Undelete a deleted share.""" _misc.require_capabilities("files_sharing.api_enabled", self._session.capabilities) share_id = share_id.share_id if isinstance(share_id, Share) else share_id - self._session.ocs(method="POST", path=f"{self._ep_base}/deletedshares/{share_id}") + self._session.ocs("POST", f"{self._ep_base}/deletedshares/{share_id}") diff --git a/nc_py_api/nextcloud.py b/nc_py_api/nextcloud.py index 07a1afdd..65477d60 100644 --- a/nc_py_api/nextcloud.py +++ b/nc_py_api/nextcloud.py @@ -166,13 +166,11 @@ def log(self, log_lvl: LogLvl, content: str) -> None: return if int(log_lvl) < self.capabilities["app_api"].get("loglevel", 0): return - self._session.ocs( - method="POST", path=f"{self._session.ae_url}/log", json={"level": int(log_lvl), "message": content} - ) + self._session.ocs("POST", f"{self._session.ae_url}/log", json={"level": int(log_lvl), "message": content}) def users_list(self) -> list[str]: """Returns list of users on the Nextcloud instance. **Available** only for ``System`` applications.""" - return self._session.ocs("GET", path=f"{self._session.ae_url}/users", params={"format": "json"}) + return self._session.ocs("GET", f"{self._session.ae_url}/users", params={"format": "json"}) def scope_allowed(self, scope: ApiScope) -> bool: """Check if API scope is avalaible for application. @@ -223,7 +221,7 @@ def register_talk_bot(self, callback_url: str, display_name: str, description: s "route": callback_url, "description": description, } - result = self._session.ocs(method="POST", path=f"{self._session.ae_url}/talk_bot", json=params) + result = self._session.ocs("POST", f"{self._session.ae_url}/talk_bot", json=params) return result["id"], result["secret"] def unregister_talk_bot(self, callback_url: str) -> bool: @@ -238,7 +236,7 @@ def unregister_talk_bot(self, callback_url: str) -> bool: "route": callback_url, } try: - self._session.ocs(method="DELETE", path=f"{self._session.ae_url}/talk_bot", json=params) + self._session.ocs("DELETE", f"{self._session.ae_url}/talk_bot", json=params) except NextcloudExceptionNotFound: return False return True @@ -265,11 +263,10 @@ def set_init_status(self, progress: int, error: str = "") -> None: :param error: if non-empty, signals to AppAPI that the application cannot be initialized successfully. """ self._session.ocs( - method="PUT", - path=f"/ocs/v1.php/apps/app_api/apps/status/{self._session.cfg.app_name}", + "PUT", + f"/ocs/v1.php/apps/app_api/apps/status/{self._session.cfg.app_name}", json={ "progress": progress, "error": error, }, - not_parse=True, ) diff --git a/nc_py_api/notes.py b/nc_py_api/notes.py index 72c2cc59..ba5b53ee 100644 --- a/nc_py_api/notes.py +++ b/nc_py_api/notes.py @@ -2,8 +2,12 @@ import dataclasses import datetime +import json import typing +import httpx + +from ._exceptions import check_error from ._misc import check_capabilities, clear_from_params_empty, require_capabilities from ._session import NcSessionBasic @@ -136,15 +140,17 @@ def get_list( } clear_from_params_empty(list(params.keys()), params) headers = {"If-None-Match": self.last_etag} if self.last_etag and etag else {} - r = self._session.request_json("GET", self._ep_base + "/notes", params=params, headers=headers) + r = self.__response_to_json(self._session.adapter.get(self._ep_base + "/notes", params=params, headers=headers)) self.last_etag = self._session.response_headers["ETag"] return [Note(i) for i in r] def by_id(self, note: Note) -> Note: """Get updated information about :py:class:`~nc_py_api.notes.Note`.""" require_capabilities("notes", self._session.capabilities) - r = self._session.request_json( - "GET", self._ep_base + f"/notes/{note.note_id}", headers={"If-None-Match": f'"{note.etag}"'} + r = self.__response_to_json( + self._session.adapter.get( + self._ep_base + f"/notes/{note.note_id}", headers={"If-None-Match": f'"{note.etag}"'} + ) ) return Note(r) if r else note @@ -166,7 +172,7 @@ def create( "modified": last_modified, } clear_from_params_empty(list(params.keys()), params) - return Note(self._session.request_json("POST", self._ep_base + "/notes", json=params)) + return Note(self.__response_to_json(self._session.adapter.post(self._ep_base + "/notes", json=params))) def update( self, @@ -193,7 +199,9 @@ def update( if not params: raise ValueError("Nothing to update.") return Note( - self._session.request_json("PUT", self._ep_base + f"/notes/{note.note_id}", json=params, headers=headers) + self.__response_to_json( + self._session.adapter.put(self._ep_base + f"/notes/{note.note_id}", json=params, headers=headers) + ) ) def delete(self, note: typing.Union[int, Note]) -> None: @@ -203,12 +211,12 @@ def delete(self, note: typing.Union[int, Note]) -> None: """ require_capabilities("notes", self._session.capabilities) note_id = note.note_id if isinstance(note, Note) else note - self._session.ocs("DELETE", self._ep_base + f"/notes/{note_id}", not_parse=True) + check_error(self._session.adapter.delete(self._ep_base + f"/notes/{note_id}")) def get_settings(self) -> NotesSettings: """Returns Notes App settings.""" require_capabilities("notes", self._session.capabilities) - r = self._session.request_json("GET", self._ep_base + "/settings") + r = self.__response_to_json(self._session.adapter.get(self._ep_base + "/settings")) return {"notes_path": r["notesPath"], "file_suffix": r["fileSuffix"]} def set_settings(self, notes_path: typing.Optional[str] = None, file_suffix: typing.Optional[str] = None) -> None: @@ -221,4 +229,9 @@ def set_settings(self, notes_path: typing.Optional[str] = None, file_suffix: typ "fileSuffix": file_suffix, } clear_from_params_empty(list(params.keys()), params) - self._session.ocs("PUT", self._ep_base + "/settings", not_parse=True, json=params) + check_error(self._session.adapter.put(self._ep_base + "/settings", json=params)) + + @staticmethod + def __response_to_json(response: httpx.Response) -> dict: + check_error(response) + return json.loads(response.text) if response.status_code != 304 else {} diff --git a/nc_py_api/notifications.py b/nc_py_api/notifications.py index b451da9b..acbf9577 100644 --- a/nc_py_api/notifications.py +++ b/nc_py_api/notifications.py @@ -126,17 +126,17 @@ def create( } if link: params["params"]["subject_params"]["link"] = link - return self._session.ocs(method="POST", path=f"{self._session.ae_url}/notification", json=params)["object_id"] + return self._session.ocs("POST", f"{self._session.ae_url}/notification", json=params)["object_id"] def get_all(self) -> list[Notification]: """Gets all notifications for a current user.""" require_capabilities("notifications", self._session.capabilities) - return [Notification(i) for i in self._session.ocs(method="GET", path=self._ep_base)] + return [Notification(i) for i in self._session.ocs("GET", self._ep_base)] def get_one(self, notification_id: int) -> Notification: """Gets a single notification for a current user.""" require_capabilities("notifications", self._session.capabilities) - return Notification(self._session.ocs(method="GET", path=f"{self._ep_base}/{notification_id}")) + return Notification(self._session.ocs("GET", f"{self._ep_base}/{notification_id}")) def by_object_id(self, object_id: str) -> typing.Optional[Notification]: """Returns Notification if any by its object ID. @@ -151,14 +151,14 @@ def by_object_id(self, object_id: str) -> typing.Optional[Notification]: def delete(self, notification_id: int) -> None: """Deletes a notification for the current user.""" require_capabilities("notifications", self._session.capabilities) - self._session.ocs(method="DELETE", path=f"{self._ep_base}/{notification_id}") + self._session.ocs("DELETE", f"{self._ep_base}/{notification_id}") def delete_all(self) -> None: """Deletes all notifications for the current user.""" require_capabilities("notifications", self._session.capabilities) - self._session.ocs(method="DELETE", path=self._ep_base) + self._session.ocs("DELETE", self._ep_base) def exists(self, notification_ids: list[int]) -> list[int]: """Checks the existence of notifications for the current user.""" require_capabilities("notifications", self._session.capabilities) - return self._session.ocs(method="POST", path=f"{self._ep_base}/exists", json={"ids": notification_ids}) + return self._session.ocs("POST", f"{self._ep_base}/exists", json={"ids": notification_ids}) diff --git a/nc_py_api/user_status.py b/nc_py_api/user_status.py index 113c564c..bd10976d 100644 --- a/nc_py_api/user_status.py +++ b/nc_py_api/user_status.py @@ -121,29 +121,22 @@ def available(self) -> bool: return not check_capabilities("user_status.enabled", self._session.capabilities) def get_list(self, limit: typing.Optional[int] = None, offset: typing.Optional[int] = None) -> list[UserStatus]: - """Returns statuses for all users. - - :param limit: limits the number of results. - :param offset: offset of results. - """ + """Returns statuses for all users.""" require_capabilities("user_status.enabled", self._session.capabilities) data = kwargs_to_params(["limit", "offset"], limit=limit, offset=offset) - result = self._session.ocs(method="GET", path=f"{self._ep_base}/statuses", params=data) + result = self._session.ocs("GET", f"{self._ep_base}/statuses", params=data) return [UserStatus(i) for i in result] def get_current(self) -> CurrentUserStatus: """Returns the current user status.""" require_capabilities("user_status.enabled", self._session.capabilities) - return CurrentUserStatus(self._session.ocs(method="GET", path=f"{self._ep_base}/user_status")) + return CurrentUserStatus(self._session.ocs("GET", f"{self._ep_base}/user_status")) def get(self, user_id: str) -> typing.Optional[UserStatus]: - """Returns the user status for the specified user. - - :param user_id: User ID for getting status. - """ + """Returns the user status for the specified user.""" require_capabilities("user_status.enabled", self._session.capabilities) try: - return UserStatus(self._session.ocs(method="GET", path=f"{self._ep_base}/statuses/{user_id}")) + return UserStatus(self._session.ocs("GET", f"{self._ep_base}/statuses/{user_id}")) except NextcloudExceptionNotFound: return None @@ -152,7 +145,7 @@ def get_predefined(self) -> list[PredefinedStatus]: if self._session.nc_version["major"] < 27: return [] require_capabilities("user_status.enabled", self._session.capabilities) - result = self._session.ocs(method="GET", path=f"{self._ep_base}/predefined_statuses") + result = self._session.ocs("GET", f"{self._ep_base}/predefined_statuses") return [PredefinedStatus(i) for i in result] def set_predefined(self, status_id: str, clear_at: int = 0) -> None: @@ -167,12 +160,12 @@ def set_predefined(self, status_id: str, clear_at: int = 0) -> None: params: dict[str, typing.Union[int, str]] = {"messageId": status_id} if clear_at: params["clearAt"] = clear_at - self._session.ocs(method="PUT", path=f"{self._ep_base}/user_status/message/predefined", params=params) + self._session.ocs("PUT", f"{self._ep_base}/user_status/message/predefined", params=params) def set_status_type(self, value: typing.Literal["online", "away", "dnd", "invisible", "offline"]) -> None: """Sets the status type for the current user.""" require_capabilities("user_status.enabled", self._session.capabilities) - self._session.ocs(method="PUT", path=f"{self._ep_base}/user_status/status", params={"statusType": value}) + self._session.ocs("PUT", f"{self._ep_base}/user_status/status", params={"statusType": value}) def set_status(self, message: typing.Optional[str] = None, clear_at: int = 0, status_icon: str = "") -> None: """Sets current user status. @@ -183,7 +176,7 @@ def set_status(self, message: typing.Optional[str] = None, clear_at: int = 0, st """ require_capabilities("user_status.enabled", self._session.capabilities) if message is None: - self._session.ocs(method="DELETE", path=f"{self._ep_base}/user_status/message") + self._session.ocs("DELETE", f"{self._ep_base}/user_status/message") return if status_icon: require_capabilities("user_status.supports_emoji", self._session.capabilities) @@ -192,13 +185,10 @@ def set_status(self, message: typing.Optional[str] = None, clear_at: int = 0, st params["clearAt"] = clear_at if status_icon: params["statusIcon"] = status_icon - self._session.ocs(method="PUT", path=f"{self._ep_base}/user_status/message/custom", params=params) + self._session.ocs("PUT", f"{self._ep_base}/user_status/message/custom", params=params) def get_backup_status(self, user_id: str = "") -> typing.Optional[UserStatus]: - """Get the backup status of the user if any. - - :param user_id: User ID for getting status. - """ + """Get the backup status of the user if any.""" require_capabilities("user_status.enabled", self._session.capabilities) user_id = user_id if user_id else self._session.user if not user_id: @@ -206,11 +196,8 @@ def get_backup_status(self, user_id: str = "") -> typing.Optional[UserStatus]: return self.get(f"_{user_id}") def restore_backup_status(self, status_id: str) -> typing.Optional[CurrentUserStatus]: - """Restores the backup state as current for the current user. - - :param status_id: backup status ID. - """ + """Restores the backup state as current for the current user.""" require_capabilities("user_status.enabled", self._session.capabilities) require_capabilities("user_status.restore", self._session.capabilities) - result = self._session.ocs(method="DELETE", path=f"{self._ep_base}/user_status/revert/{status_id}") + result = self._session.ocs("DELETE", f"{self._ep_base}/user_status/revert/{status_id}") return result if result else None diff --git a/nc_py_api/users.py b/nc_py_api/users.py index 69fb7be6..62da37cb 100644 --- a/nc_py_api/users.py +++ b/nc_py_api/users.py @@ -4,6 +4,7 @@ import datetime import typing +from ._exceptions import check_error from ._misc import kwargs_to_params from ._session import NcSessionBasic @@ -169,23 +170,14 @@ def __init__(self, session: NcSessionBasic): def get_list( self, mask: typing.Optional[str] = "", limit: typing.Optional[int] = None, offset: typing.Optional[int] = None ) -> list[str]: - """Returns list of user IDs. - - :param mask: user ID mask to apply. - :param limit: limits the number of results. - :param offset: offset of results. - """ + """Returns list of user IDs.""" data = kwargs_to_params(["search", "limit", "offset"], search=mask, limit=limit, offset=offset) - response_data = self._session.ocs(method="GET", path=self._ep_base, params=data) + response_data = self._session.ocs("GET", self._ep_base, params=data) return response_data["users"] if response_data else {} def get_user(self, user_id: str = "") -> UserInfo: - """Returns detailed user information. - - :param user_id: the identifier of the user about which information is to be returned. - """ - url_path = f"{self._ep_base}/{user_id}" if user_id else "/ocs/v1.php/cloud/user" - return UserInfo(self._session.ocs(method="GET", path=url_path)) + """Returns detailed user information.""" + return UserInfo(self._session.ocs("GET", f"{self._ep_base}/{user_id}" if user_id else "/ocs/v1.php/cloud/user")) def create(self, user_id: str, display_name: typing.Optional[str] = None, **kwargs) -> None: """Create a new user on the Nextcloud server. @@ -213,39 +205,27 @@ def create(self, user_id: str, display_name: typing.Optional[str] = None, **kwar data[k] = kwargs[k] if display_name is not None: data["displayname"] = display_name - self._session.ocs(method="POST", path=self._ep_base, json=data) + self._session.ocs("POST", self._ep_base, json=data) def delete(self, user_id: str) -> None: - """Deletes user from the Nextcloud server. - - :param user_id: id of the user. - """ - self._session.ocs(method="DELETE", path=f"{self._ep_base}/{user_id}") + """Deletes user from the Nextcloud server.""" + self._session.ocs("DELETE", f"{self._ep_base}/{user_id}") def enable(self, user_id: str) -> None: - """Enables user on the Nextcloud server. - - :param user_id: id of the user. - """ - self._session.ocs(method="PUT", path=f"{self._ep_base}/{user_id}/enable") + """Enables user on the Nextcloud server.""" + self._session.ocs("PUT", f"{self._ep_base}/{user_id}/enable") def disable(self, user_id: str) -> None: - """Disables user on the Nextcloud server. - - :param user_id: id of the user. - """ - self._session.ocs(method="PUT", path=f"{self._ep_base}/{user_id}/disable") + """Disables user on the Nextcloud server.""" + self._session.ocs("PUT", f"{self._ep_base}/{user_id}/disable") def resend_welcome_email(self, user_id: str) -> None: - """Send welcome email for specified user again. - - :param user_id: id of the user. - """ - self._session.ocs(method="POST", path=f"{self._ep_base}/{user_id}/welcome") + """Send welcome email for specified user again.""" + self._session.ocs("POST", f"{self._ep_base}/{user_id}/welcome") def editable_fields(self) -> list[str]: """Returns user fields that avalaible for edit.""" - return self._session.ocs(method="GET", path="/ocs/v1.php/cloud/user/fields") + return self._session.ocs("GET", "/ocs/v1.php/cloud/user/fields") def edit(self, user_id: str, **kwargs) -> None: """Edits user metadata. @@ -254,39 +234,23 @@ def edit(self, user_id: str, **kwargs) -> None: :param kwargs: dictionary where keys are values from ``editable_fields`` method, and values to set. """ for k, v in kwargs.items(): - self._session.ocs(method="PUT", path=f"{self._ep_base}/{user_id}", params={"key": k, "value": v}) + self._session.ocs("PUT", f"{self._ep_base}/{user_id}", params={"key": k, "value": v}) def add_to_group(self, user_id: str, group_id: str) -> None: - """Adds user to the group. - - :param user_id: ID of the user. - :param group_id: the destination group to which add user to. - """ - self._session.ocs(method="POST", path=f"{self._ep_base}/{user_id}/groups", params={"groupid": group_id}) + """Adds user to the group.""" + self._session.ocs("POST", f"{self._ep_base}/{user_id}/groups", params={"groupid": group_id}) def remove_from_group(self, user_id: str, group_id: str) -> None: - """Removes user from the group. - - :param user_id: ID of the user. - :param group_id: group from which remove user. - """ - self._session.ocs(method="DELETE", path=f"{self._ep_base}/{user_id}/groups", params={"groupid": group_id}) + """Removes user from the group.""" + self._session.ocs("DELETE", f"{self._ep_base}/{user_id}/groups", params={"groupid": group_id}) def promote_to_subadmin(self, user_id: str, group_id: str) -> None: - """Makes user admin of the group. - - :param user_id: ID of the user. - :param group_id: group where user should become administrator. - """ - self._session.ocs(method="POST", path=f"{self._ep_base}/{user_id}/subadmins", params={"groupid": group_id}) + """Makes user admin of the group.""" + self._session.ocs("POST", f"{self._ep_base}/{user_id}/subadmins", params={"groupid": group_id}) def demote_from_subadmin(self, user_id: str, group_id: str) -> None: - """Removes user from the admin role of the group. - - :param user_id: ID of the user. - :param group_id: group where user should be removed from administrators. - """ - self._session.ocs(method="DELETE", path=f"{self._ep_base}/{user_id}/subadmins", params={"groupid": group_id}) + """Removes user from the admin role of the group.""" + self._session.ocs("DELETE", f"{self._ep_base}/{user_id}/subadmins", params={"groupid": group_id}) def get_avatar( self, user_id: str = "", size: typing.Literal[64, 512] = 512, dark: bool = False, guest: bool = False @@ -304,4 +268,6 @@ def get_avatar( url_path = f"/index.php/avatar/{user_id}/{size}" if not guest else f"/index.php/avatar/guest/{user_id}/{size}" if dark: url_path += "/dark" - return self._session.request(method="GET", path=url_path).content + response = self._session.adapter.get(url_path) + check_error(response) + return response.content diff --git a/nc_py_api/users_groups.py b/nc_py_api/users_groups.py index 64220917..f8a8e5d2 100644 --- a/nc_py_api/users_groups.py +++ b/nc_py_api/users_groups.py @@ -62,67 +62,40 @@ def __init__(self, session: NcSessionBasic): def get_list( self, mask: typing.Optional[str] = None, limit: typing.Optional[int] = None, offset: typing.Optional[int] = None ) -> list[str]: - """Returns a list of user groups IDs. - - :param mask: group ID mask to apply. - :param limit: limits the number of results. - :param offset: offset of results. - """ + """Returns a list of user groups IDs.""" data = kwargs_to_params(["search", "limit", "offset"], search=mask, limit=limit, offset=offset) - response_data = self._session.ocs(method="GET", path=self._ep_base, params=data) + response_data = self._session.ocs("GET", self._ep_base, params=data) return response_data["groups"] if response_data else [] def get_details( self, mask: typing.Optional[str] = None, limit: typing.Optional[int] = None, offset: typing.Optional[int] = None ) -> list[GroupDetails]: - """Returns a list of user groups with detailed information. - - :param mask: group ID mask to apply. - :param limit: limits the number of results. - :param offset: offset of results. - """ + """Returns a list of user groups with detailed information.""" data = kwargs_to_params(["search", "limit", "offset"], search=mask, limit=limit, offset=offset) - response_data = self._session.ocs(method="GET", path=f"{self._ep_base}/details", params=data) + response_data = self._session.ocs("GET", f"{self._ep_base}/details", params=data) return [GroupDetails(i) for i in response_data["groups"]] if response_data else [] def create(self, group_id: str, display_name: typing.Optional[str] = None) -> None: - """Creates the users group. - - :param group_id: the ID of group to be created. - :param display_name: display name for a created group. - """ + """Creates the users group.""" params = {"groupid": group_id} if display_name is not None: params["displayname"] = display_name - self._session.ocs(method="POST", path=f"{self._ep_base}", params=params) + self._session.ocs("POST", f"{self._ep_base}", params=params) def edit(self, group_id: str, display_name: str) -> None: - """Edits users group information. - - :param group_id: the ID of group to edit info. - :param display_name: new group display name. - """ + """Edits users group information.""" params = {"key": "displayname", "value": display_name} - self._session.ocs(method="PUT", path=f"{self._ep_base}/{group_id}", params=params) + self._session.ocs("PUT", f"{self._ep_base}/{group_id}", params=params) def delete(self, group_id: str) -> None: - """Removes the users group. - - :param group_id: the ID of group to remove. - """ - self._session.ocs(method="DELETE", path=f"{self._ep_base}/{group_id}") + """Removes the users group.""" + self._session.ocs("DELETE", f"{self._ep_base}/{group_id}") def get_members(self, group_id: str) -> list[str]: - """Returns a list of group users. - - :param group_id: Group ID to get the list of members. - """ - response_data = self._session.ocs(method="GET", path=f"{self._ep_base}/{group_id}") + """Returns a list of group users.""" + response_data = self._session.ocs("GET", f"{self._ep_base}/{group_id}") return response_data["users"] if response_data else {} def get_subadmins(self, group_id: str) -> list[str]: - """Returns list of users who is subadmins of the group. - - :param group_id: group ID to get the list of subadmins. - """ - return self._session.ocs(method="GET", path=f"{self._ep_base}/{group_id}/subadmins") + """Returns list of users who is subadmins of the group.""" + return self._session.ocs("GET", f"{self._ep_base}/{group_id}/subadmins") diff --git a/nc_py_api/weather_status.py b/nc_py_api/weather_status.py index f78a5026..184e96ad 100644 --- a/nc_py_api/weather_status.py +++ b/nc_py_api/weather_status.py @@ -57,7 +57,7 @@ def available(self) -> bool: def get_location(self) -> WeatherLocation: """Returns the current location set on the Nextcloud server for the user.""" require_capabilities("weather_status.enabled", self._session.capabilities) - return WeatherLocation(self._session.ocs(method="GET", path=f"{self._ep_base}/location")) + return WeatherLocation(self._session.ocs("GET", f"{self._ep_base}/location")) def set_location( self, @@ -79,23 +79,23 @@ def set_location( params["address"] = address else: raise ValueError("latitude & longitude or address should be present") - result = self._session.ocs(method="PUT", path=f"{self._ep_base}/location", params=params) + result = self._session.ocs("PUT", f"{self._ep_base}/location", params=params) return result.get("success", False) def get_forecast(self) -> list[dict]: """Get forecast for the current location.""" require_capabilities("weather_status.enabled", self._session.capabilities) - return self._session.ocs(method="GET", path=f"{self._ep_base}/forecast") + return self._session.ocs("GET", f"{self._ep_base}/forecast") def get_favorites(self) -> list[str]: """Returns favorites addresses list.""" require_capabilities("weather_status.enabled", self._session.capabilities) - return self._session.ocs(method="GET", path=f"{self._ep_base}/favorites") + return self._session.ocs("GET", f"{self._ep_base}/favorites") def set_favorites(self, favorites: list[str]) -> bool: """Sets favorites addresses list.""" require_capabilities("weather_status.enabled", self._session.capabilities) - result = self._session.ocs(method="PUT", path=f"{self._ep_base}/favorites", json={"favorites": favorites}) + result = self._session.ocs("PUT", f"{self._ep_base}/favorites", json={"favorites": favorites}) return result.get("success", False) def set_mode(self, mode: WeatherLocationMode) -> bool: @@ -103,5 +103,5 @@ def set_mode(self, mode: WeatherLocationMode) -> bool: if int(mode) == WeatherLocationMode.UNKNOWN.value: raise ValueError("This mode can not be set") require_capabilities("weather_status.enabled", self._session.capabilities) - result = self._session.ocs(method="PUT", path=f"{self._ep_base}/mode", params={"mode": int(mode)}) + result = self._session.ocs("PUT", f"{self._ep_base}/mode", params={"mode": int(mode)}) return result.get("success", False) diff --git a/tests/actual_tests/appcfg_prefs_ex_test.py b/tests/actual_tests/appcfg_prefs_ex_test.py index 349b8c8c..b73af876 100644 --- a/tests/actual_tests/appcfg_prefs_ex_test.py +++ b/tests/actual_tests/appcfg_prefs_ex_test.py @@ -124,21 +124,21 @@ def test_appcfg_sensitive(nc_app): appcfg.delete("test_key") # next code tests `sensitive` value from the `AppAPI` params = {"configKey": "test_key", "configValue": "123"} - result = nc_app._session.ocs(method="POST", path=f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) + result = nc_app._session.ocs("POST", f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) assert not result["sensitive"] # by default if sensitive value is unspecified it is False appcfg.delete("test_key") params = {"configKey": "test_key", "configValue": "123", "sensitive": True} - result = nc_app._session.ocs(method="POST", path=f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) + result = nc_app._session.ocs("POST", f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) assert result["configkey"] == "test_key" assert result["configvalue"] == "123" assert bool(result["sensitive"]) is True params.pop("sensitive") # if we not specify value, AppEcosystem should not change it. - result = nc_app._session.ocs(method="POST", path=f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) + result = nc_app._session.ocs("POST", f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) assert result["configkey"] == "test_key" assert result["configvalue"] == "123" assert bool(result["sensitive"]) is True params["sensitive"] = False - result = nc_app._session.ocs(method="POST", path=f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) + result = nc_app._session.ocs("POST", f"{nc_app._session.ae_url}/{appcfg._url_suffix}", json=params) assert result["configkey"] == "test_key" assert result["configvalue"] == "123" assert bool(result["sensitive"]) is False diff --git a/tests/actual_tests/logs_test.py b/tests/actual_tests/logs_test.py index e9c2ca3f..96ecc188 100644 --- a/tests/actual_tests/logs_test.py +++ b/tests/actual_tests/logs_test.py @@ -41,11 +41,11 @@ def test_loglvl_less(nc_app): current_log_lvl = nc_app.capabilities["app_api"].get("loglevel", LogLvl.FATAL) if current_log_lvl == LogLvl.DEBUG: pytest.skip("Log lvl to low") - with mock.patch("tests.conftest.NC_APP._session._ocs") as _ocs: + with mock.patch("tests.conftest.NC_APP._session.ocs") as ocs: nc_app.log(int(current_log_lvl) - 1, "will not be sent") # noqa - _ocs.assert_not_called() + ocs.assert_not_called() nc_app.log(current_log_lvl, "will be sent") - assert _ocs.call_count > 0 + assert ocs.call_count > 0 def test_log_without_app_api(nc_app): @@ -56,7 +56,7 @@ def test_log_without_app_api(nc_app): patched_capabilities = {"capabilities": srv_capabilities, "version": srv_version} with ( mock.patch.dict("tests.conftest.NC_APP._session._capabilities", patched_capabilities, clear=True), - mock.patch("tests.conftest.NC_APP._session._ocs") as _ocs, + mock.patch("tests.conftest.NC_APP._session.ocs") as ocs, ): nc_app.log(log_lvl, "will not be sent") - _ocs.assert_not_called() + ocs.assert_not_called() diff --git a/tests/actual_tests/misc_test.py b/tests/actual_tests/misc_test.py index c402e444..c0dc05ce 100644 --- a/tests/actual_tests/misc_test.py +++ b/tests/actual_tests/misc_test.py @@ -2,6 +2,7 @@ import os import pytest +from httpx import Request, Response from nc_py_api import Nextcloud, NextcloudApp, NextcloudException, ex_app from nc_py_api._deffered_error import DeferredError # noqa @@ -14,9 +15,9 @@ def test_check_error(code): if 996 <= code <= 999: with pytest.raises(NextcloudException): - check_error(code) + check_error(Response(code, request=Request(method="GET", url="https://example"))) else: - check_error(code) + check_error(Response(code, request=Request(method="GET", url="https://example"))) def test_nc_exception_to_str(): @@ -58,9 +59,9 @@ def test_deffered_error(): unknown_non_exist_module.some_class_or_func() -def test_ocs_response_headers(nc): +def test_response_headers(nc): old_headers = nc.response_headers - nc.users.get_user() + nc.users.get_user(nc.user) # do not remove "nc.user" arguments, it helps to trigger response header updates. assert old_headers != nc.response_headers diff --git a/tests/actual_tests/talk_test.py b/tests/actual_tests/talk_test.py index 3f38e5cf..77201ed9 100644 --- a/tests/actual_tests/talk_test.py +++ b/tests/actual_tests/talk_test.py @@ -4,7 +4,7 @@ import pytest from PIL import Image -from nc_py_api import Nextcloud, files, talk, talk_bot +from nc_py_api import Nextcloud, NextcloudException, files, talk, talk_bot def test_conversation_create_delete(nc): @@ -242,6 +242,7 @@ def test_list_bots(nc, nc_app): nc.talk.delete_conversation(conversation.token) +@pytest.mark.skipif(environ.get("CI", None) is None, reason="run only on GitHub") @pytest.mark.require_nc(major=27, minor=1) def test_chat_bot_receive_message(nc_app): if nc_app.talk.bots_available is False: @@ -382,6 +383,8 @@ def test_conversation_avatar(nc_any): assert r.is_custom_avatar is True r = nc_any.talk.get_conversation_avatar(conversation, dark=True) assert isinstance(r, bytes) + with pytest.raises(NextcloudException): + nc_any.talk.get_conversation_avatar("not_exist_conversation") finally: nc_any.talk.delete_conversation(conversation) diff --git a/tests/actual_tests/users_test.py b/tests/actual_tests/users_test.py index e2e28063..7e8ef1b1 100644 --- a/tests/actual_tests/users_test.py +++ b/tests/actual_tests/users_test.py @@ -155,3 +155,5 @@ def test_avatars(nc): for i in (im, im_64, im_black, im_64_black): img = Image.open(BytesIO(i)) img.load() + with pytest.raises(NextcloudException): + nc.users.get_avatar("not_existing_user")