pyCraft/minecraft/authentication.py

316 lines
9.7 KiB
Python

import requests
import json
import uuid
from .exceptions import YggdrasilError
#: The base url for Ygdrassil requests
AUTH_SERVER = "https://authserver.mojang.com"
SESSION_SERVER = "https://sessionserver.mojang.com/session/minecraft"
# Need this content type, or authserver will complain
CONTENT_TYPE = "application/json"
HEADERS = {"content-type": CONTENT_TYPE}
class Profile(object):
"""
Container class for a MineCraft Selected profile.
See: `<http://wiki.vg/Authentication>`_
"""
def __init__(self, id_=None, name=None):
self.id_ = id_
self.name = name
def to_dict(self):
"""
Returns ``self`` in dictionary-form, which can be serialized by json.
"""
if self:
return {"id": self.id_,
"name": self.name}
else:
raise AttributeError("Profile is not yet populated.")
def __bool__(self):
bool_state = self.id_ is not None and self.name is not None
return bool_state
# Python 2 support
def __nonzero__(self):
return self.__bool__()
class AuthenticationToken(object):
"""
Represents an authentication token.
See http://wiki.vg/Authentication.
"""
AGENT_NAME = "Minecraft"
AGENT_VERSION = 1
def __init__(self, username=None, access_token=None, client_token=None):
"""
Constructs an `AuthenticationToken` based on `access_token` and
`client_token`.
Parameters:
access_token - An `str` object containing the `access_token`.
client_token - An `str` object containing the `client_token`.
Returns:
A `AuthenticationToken` with `access_token` and `client_token` set.
"""
self.username = username
self.access_token = access_token
self.client_token = client_token
self.profile = Profile()
@property
def authenticated(self):
"""
Attribute which is ``True`` when the token is authenticated and
``False`` when it isn't.
"""
if not self.username:
return False
if not self.access_token:
return False
if not self.client_token:
return False
if not self.profile:
return False
return True
def authenticate(self, username, password, invalidate_previous=False):
"""
Authenticates the user against https://authserver.mojang.com using
`username` and `password` parameters.
Parameters:
username - An `str` object with the username (unmigrated accounts)
or email address for a Mojang account.
password - An `str` object with the password.
invalidate_previous - A `bool`. When `True`, invalidate
all previously acquired `access_token`s across all clients.
Returns:
Returns `True` if successful.
Otherwise it will raise an exception.
Raises:
minecraft.exceptions.YggdrasilError
"""
payload = {
"agent": {
"name": self.AGENT_NAME,
"version": self.AGENT_VERSION
},
"username": username,
"password": password
}
if not invalidate_previous:
# Include a `client_token` in the payload to prevent existing
# `access_token`s from being invalidated. If `self.client_token`
# is `None` generate a `client_token` using uuid4
payload["clientToken"] = self.client_token or uuid.uuid4().hex
res = _make_request(AUTH_SERVER, "authenticate", payload)
_raise_from_response(res)
json_resp = res.json()
self.username = username
self.access_token = json_resp["accessToken"]
self.client_token = json_resp["clientToken"]
self.profile.id_ = json_resp["selectedProfile"]["id"]
self.profile.name = json_resp["selectedProfile"]["name"]
return True
def refresh(self):
"""
Refreshes the `AuthenticationToken`. Used to keep a user logged in
between sessions and is preferred over storing a user's password in a
file.
Returns:
Returns `True` if `AuthenticationToken` was successfully refreshed.
Otherwise it raises an exception.
Raises:
minecraft.exceptions.YggdrasilError
ValueError - if `AuthenticationToken.access_token` or
`AuthenticationToken.client_token` isn't set.
"""
if self.access_token is None:
raise ValueError("'access_token' not set!'")
if self.client_token is None:
raise ValueError("'client_token' is not set!")
res = _make_request(AUTH_SERVER,
"refresh", {"accessToken": self.access_token,
"clientToken": self.client_token})
_raise_from_response(res)
json_resp = res.json()
self.access_token = json_resp["accessToken"]
self.client_token = json_resp["clientToken"]
self.profile.id_ = json_resp["selectedProfile"]["id"]
self.profile.name = json_resp["selectedProfile"]["name"]
return True
def validate(self):
"""
Validates the AuthenticationToken.
`AuthenticationToken.access_token` must be set!
Returns:
Returns `True` if `AuthenticationToken` is valid.
Otherwise it will raise an exception.
Raises:
minecraft.exceptions.YggdrasilError
ValueError - if `AuthenticationToken.access_token` is not set.
"""
if self.access_token is None:
raise ValueError("'access_token' not set!")
res = _make_request(AUTH_SERVER, "validate",
{"accessToken": self.access_token})
# Validate returns 204 to indicate success
# http://wiki.vg/Authentication#Response_3
if res.status_code == 204:
return True
@staticmethod
def sign_out(username, password):
"""
Invalidates `access_token`s using an account's
`username` and `password`.
Parameters:
username - ``str`` containing the username
password - ``str`` containing the password
Returns:
Returns `True` if sign out was successful.
Otherwise it will raise an exception.
Raises:
minecraft.exceptions.YggdrasilError
"""
res = _make_request(AUTH_SERVER, "signout",
{"username": username, "password": password})
if _raise_from_response(res) is None:
return True
def invalidate(self):
"""
Invalidates `access_token`s using the token pair stored in
the `AuthenticationToken`.
Returns:
``True`` if tokens were successfully invalidated.
Raises:
:class:`minecraft.exceptions.YggdrasilError`
"""
res = _make_request(AUTH_SERVER, "invalidate",
{"accessToken": self.access_token,
"clientToken": self.client_token})
if res.status_code != 204:
_raise_from_response(res)
return True
def join(self, server_id):
"""
Informs the Mojang session-server that we're joining the
MineCraft server with id ``server_id``.
Parameters:
server_id - ``str`` with the server id
Returns:
``True`` if no errors occured
Raises:
:class:`minecraft.exceptions.YggdrasilError`
"""
if not self.authenticated:
err = "AuthenticationToken hasn't been authenticated yet!"
raise YggdrasilError(err)
res = _make_request(SESSION_SERVER, "join",
{"accessToken": self.access_token,
"selectedProfile": self.profile.to_dict(),
"serverId": server_id})
if res.status_code != 204:
_raise_from_response(res)
return True
def _make_request(server, endpoint, data):
"""
Fires a POST with json-packed data to the given endpoint and returns
response.
Parameters:
endpoint - An `str` object with the endpoint, e.g. "authenticate"
data - A `dict` containing the payload data.
Returns:
A `requests.Request` object.
"""
res = requests.post(server + "/" + endpoint, data=json.dumps(data),
headers=HEADERS, timeout=15)
return res
def _raise_from_response(res):
"""
Raises an appropriate `YggdrasilError` based on the `status_code` and
`json` of a `requests.Request` object.
"""
if res.status_code == requests.codes['ok']:
return None
exception = YggdrasilError()
exception.status_code = res.status_code
try:
json_resp = res.json()
if not ("error" in json_resp and "errorMessage" in json_resp):
raise ValueError
except ValueError:
message = "[{status_code}] Malformed error message: '{response_text}'"
message = message.format(status_code=str(res.status_code),
response_text=res.text)
exception.args = (message,)
else:
message = "[{status_code}] {error}: '{error_message}'"
message = message.format(status_code=str(res.status_code),
error=json_resp["error"],
error_message=json_resp["errorMessage"])
exception.args = (message,)
exception.yggdrasil_error = json_resp["error"]
exception.yggdrasil_message = json_resp["errorMessage"]
exception.yggdrasil_cause = json_resp.get("cause")
raise exception