conflicts

This commit is contained in:
Brian Merriam 2023-03-21 00:58:39 +00:00
commit 2de22bdae0
4 changed files with 326 additions and 33 deletions

View File

@ -64,7 +64,6 @@ author = u"Ammar Askar"
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = minecraft.__version__
# The full version, including alpha/beta/rc tags.
@ -124,7 +123,6 @@ todo_include_todos = True
if os.environ.get("READTHEDOCS", "") != "True":
try:
import sphinx_rtd_theme
html_theme = "sphinx_rtd_theme"
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
except ImportError:
@ -227,10 +225,13 @@ htmlhelp_basename = "pyCraftdoc"
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
# 'preamble': '',
# Latex figure (float) alignment
# 'figure_align': 'htbp',
}
@ -239,11 +240,8 @@ latex_elements = {
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc,
"pyCraft.tex",
u"pyCraft Documentation",
u"Ammar Askar",
"manual"),
(master_doc, 'pyCraft.tex', u'pyCraft Documentation',
u'Ammar Askar', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
@ -283,15 +281,9 @@ man_pages = [(master_doc, "pycraft", u"pyCraft Documentation", [author], 1)]
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(
master_doc,
"pyCraft",
u"pyCraft Documentation",
author,
"pyCraft",
"One line description of project.",
"Miscellaneous",
),
(master_doc, 'pyCraft', u'pyCraft Documentation',
author, 'pyCraft', 'One line description of project.',
'Miscellaneous'),
]
# Documents to append as an appendix to all manuals.

View File

@ -73,16 +73,23 @@ def get_options():
help="include unknown packets in --dump-packets output",
)
parser.add_option(
"-m",
"--microsoft",
dest="microsoft",
action="store_true",
help="Enable Microsoft Auth")
(options, args) = parser.parse_args()
if not options.username:
options.username = input("Enter your username: ")
if not options.microsoft:
if not options.username:
options.username = input("Enter your username: ")
if not options.password and not options.offline:
options.password = getpass.getpass(
"Enter your password (leave " "blank for offline mode): "
)
options.offline = options.offline or (options.password == "")
if not options.password and not options.offline:
options.password = getpass.getpass("Enter your password (leave "
"blank for offline mode): ")
options.offline = options.offline or (options.password == "")
if not options.server:
options.server = input(
@ -125,9 +132,10 @@ def main():
options.address, options.port, username=options.username
)
else:
auth_token = authentication.AuthenticationToken()
try:
auth_token.authenticate(options.username, options.password)
auth_token = authentication.Microsoft_AuthenticationToken()
auth_token.authenticate()
except YggdrasilError as e:
print(e)
sys.exit()
@ -135,7 +143,9 @@ def main():
connection = Connection(
options.address,
options.port,
auth_token=auth_token)
auth_token,
None,
"1.8")
if options.dump_packets:

View File

@ -1,6 +1,7 @@
import requests
import json
import uuid
import os
from .exceptions import YggdrasilError
#: The base url for Ygdrassil requests
@ -314,3 +315,295 @@ def _raise_from_response(res):
exception.yggdrasil_cause = json_resp.get("cause")
raise exception
class Microsoft_AuthenticationToken(object):
"""
Represents an authentication token.
See https://wiki.vg/Microsoft_Authentication_Scheme.
This class was shameless copied from github issue,
https://github.com/ammaraskar/pyCraft/issues/234.
The user https://github.com/shikukuya, contributed the comment:
https://github.com/ammaraskar/pyCraft/issues/234#issuecomment-1365140050
I have simply created a fork and submitted the changes as they describe
the usage in their comment. All credit for this goes to shikukua.
"""
UserLoginURL = "https://login.live.com/oauth20_authorize.srf?\
client_id=00000000402b5328&response_type=code\
&scope=service%3A%3Auser.auth.xboxlive.com%3A%3AMBI_SSL&redirect_uri=\
https%3A%2F%2Flogin.live.com%2Foauth20_desktop.srf"
oauth20_URL = 'https://login.live.com/oauth20_token.srf'
XBL_URL = 'https://user.auth.xboxlive.com/user/authenticate'
XSTS_URL = 'https://xsts.auth.xboxlive.com/xsts/authorize'
LOGIN_WITH_XBOX_URL = "https://api.minecraftservices.com/\
authentication/login_with_xbox"
CheckAccount_URL = 'https://api.minecraftservices.com/entitlements/mcstore'
Profile_URL = 'https://api.minecraftservices.com/minecraft/profile'
jwt_Token = ''
def __init__(self, access_token=None):
self.access_token = access_token
self.profile = Profile()
def GetoAuth20(self, code='') -> object:
if code == '':
print("Please copy this link to your browser to open:"
"\n%s" % self.UserLoginURL)
code = input(
"After logging in,"
"paste the 'code' field in your browser's address bar here:")
oauth20 = requests.post(
self.oauth20_URL,
data={
"client_id": "00000000402b5328",
"code": "{}".format(code),
"grant_type": "authorization_code",
"redirect_uri": "https://login.live.com/oauth20_desktop.srf",
"scope": "service::user.auth.xboxlive.com::MBI_SSL"
},
headers={"content-type": "application/x-www-form-urlencoded"},
timeout=15)
oauth20 = json.loads(oauth20.text)
if 'error' in oauth20:
print("Error: %s" % oauth20["error"])
return 1
else:
self.oauth20_access_token = oauth20['access_token']
self.oauth20_refresh_token = oauth20['refresh_token']
oauth20_access_token = oauth20['access_token']
oauth20_refresh_token = oauth20['refresh_token']
return {
"access_token": oauth20_access_token,
"refresh_token": oauth20_refresh_token
}
def GetXBL(self, access_token: str) -> object:
XBL = requests.post(self.XBL_URL,
json={
"Properties": {
"AuthMethod": "RPS",
"SiteName": "user.auth.xboxlive.com",
"RpsTicket": "{}".format(access_token)
},
"RelyingParty": "http://auth.xboxlive.com",
"TokenType": "JWT"
},
headers=HEADERS,
timeout=15)
return {
"Token": json.loads(XBL.text)['Token'],
"uhs": json.loads(XBL.text)['DisplayClaims']['xui'][0]['uhs']
}
def GetXSTS(self, access_token: str) -> object:
XBL = requests.post(self.XSTS_URL,
json={
"Properties": {
"SandboxId": "RETAIL",
"UserTokens": ["{}".format(access_token)]
},
"RelyingParty":
"rp://api.minecraftservices.com/",
"TokenType": "JWT"
},
headers=HEADERS,
timeout=15)
return {
"Token": json.loads(XBL.text)['Token'],
"uhs": json.loads(XBL.text)['DisplayClaims']['xui'][0]['uhs']
}
def GetXBOX(self, access_token: str, uhs: str) -> str:
mat_jwt = requests.post(
self.LOGIN_WITH_XBOX_URL,
json={"identityToken": "XBL3.0 x={};{}".format(uhs, access_token)},
headers=HEADERS,
timeout=15)
self.access_token = json.loads(mat_jwt.text)['access_token']
return self.access_token
def CheckAccount(self, jwt_Token: str) -> bool:
CheckAccount = requests.get(
self.CheckAccount_URL,
headers={"Authorization": "Bearer {}".format(jwt_Token)},
timeout=15)
CheckAccount = len(json.loads(CheckAccount.text)['items'])
if CheckAccount != 0:
return True
else:
return False
def GetProfile(self, access_token: str) -> object:
if self.CheckAccount(access_token):
Profile = requests.get(
self.Profile_URL,
headers={"Authorization": "Bearer {}".format(access_token)},
timeout=15)
Profile = json.loads(Profile.text)
if 'error' in Profile:
return False
self.profile.id_ = Profile["id"]
self.profile.name = Profile["name"]
self.username = Profile["name"]
return True
else:
return False
@property
def authenticated(self):
"""
Attribute which is ``True`` when the token is authenticated and
``False`` when it isn't.
"""
if not self.username:
return False
if not self.access_token:
return False
if not self.oauth20_refresh_token:
return False
if not self.profile:
return False
return True
def authenticate(self):
"Get verification information for a Microsoft account"
oauth20 = self.GetoAuth20()
if oauth20 == 1:
return False
XBL = self.GetXBL(oauth20['access_token'])
XSTS = self.GetXSTS(XBL['Token'])
XBOX = self.GetXBOX(XSTS['Token'], XSTS['uhs'])
if self.GetProfile(XBOX):
print('GameID: {}'.format(self.profile.id_))
self.PersistenceLogoin_w()
return True
else:
print('Account does not exist')
return False
def refresh(self):
"""
Refreshes the `AuthenticationToken`. Used to keep a user logged in
between sessions and is preferred over storing a user's password in a
file.
Returns:
Returns `True` if `AuthenticationToken` was successfully refreshed.
Otherwise it raises an exception.
Raises:
minecraft.exceptions.YggdrasilError
ValueError - if `AuthenticationToken.access_token` or
`AuthenticationToken.client_token` isn't set.
"""
if self.access_token is None:
raise ValueError("'access_token' not set!'")
if self.oauth20_refresh_token is None:
raise ValueError("'oauth20_refresh_token' is not set!")
oauth20 = requests.post(
self.oauth20_URL,
data={
"client_id": "00000000402b5328",
"refresh_token": "{}".format(self.oauth20_refresh_token),
"grant_type": "refresh_token",
"redirect_uri": "https://login.live.com/oauth20_desktop.srf",
"scope": "service::user.auth.xboxlive.com::MBI_SSL"
},
headers={"content-type": "application/x-www-form-urlencoded"},
timeout=15)
oauth20 = json.loads(oauth20.text)
if 'error' in oauth20:
print("Error: %s" % oauth20["error"])
return False
else:
self.oauth20_access_token = oauth20['access_token']
self.oauth20_refresh_token = oauth20['refresh_token']
XBL = self.GetXBL(self.oauth20_access_token)
XSTS = self.GetXSTS(XBL['Token'])
XBOX = self.GetXBOX(XSTS['Token'], XSTS['uhs'])
if self.GetProfile(XBOX):
self.PersistenceLogoin_w()
print('account: {}'.format(self.profile.id_))
return True
else:
print('Account does not exist')
return False
def join(self, server_id):
"""
Informs the Mojang session-server that we're joining the
MineCraft server with id ``server_id``.
Parameters:
server_id - ``str`` with the server id
Returns:
``True`` if no errors occured
Raises:
:class:`minecraft.exceptions.YggdrasilError`
"""
if not self.authenticated:
err = "AuthenticationToken hasn't been authenticated yet!"
raise YggdrasilError(err)
res = _make_request(
SESSION_SERVER, "join", {
"accessToken": self.access_token,
"selectedProfile": self.profile.to_dict(),
"serverId": server_id
})
if res.status_code != 204:
_raise_from_response(res)
return True
def PersistenceLogoin_w(self):
"Save access token persistent login"
ProjectDir = os.path.dirname(os.path.dirname('{}'.format(__file__)))
PersistenceDir = '{}/Persistence'.format(ProjectDir)
if not self.authenticated:
err = "AuthenticationToken hasn't been authenticated yet!"
raise YggdrasilError(err)
if not os.path.exists(PersistenceDir):
os.mkdir(PersistenceDir)
print(PersistenceDir)
"Save access_token and oauth20_refresh_token"
with open("{}/{}".format(PersistenceDir, self.username),
mode='w',
encoding='utf-8') as file_obj:
file_obj.write('{{"{}": "{}","{}": "{}"}}'.format(
'access_token', self.access_token, 'oauth20_refresh_token',
self.oauth20_refresh_token))
file_obj.close()
return True
def PersistenceLogoin_r(self, GameID: str):
"Load access token persistent login"
ProjectDir = os.path.dirname(os.path.dirname('{}'.format(__file__)))
PersistenceDir = '{}/Persistence'.format(ProjectDir)
if not os.path.exists(PersistenceDir):
return False
"Load access_token and oauth20_refresh_token"
if os.path.isfile("{}/{}".format(PersistenceDir, GameID)):
with open("{}/{}".format(PersistenceDir, GameID),
mode='r',
encoding='utf-8') as file_obj:
Persistence = file_obj.read()
file_obj.close()
Persistence = json.loads(Persistence)
self.access_token = Persistence["access_token"]
self.oauth20_refresh_token = Persistence[
"oauth20_refresh_token"]
self.refresh()
return self.authenticated
else:
return False

View File

@ -498,9 +498,8 @@ class Connection(object):
# versions that always resolved hostnames to IPv4 addresses),
# then IPv6, then other address families.
def key(ai):
return (0 if ai[0] == socket.AF_INET else 1 if ai[0]
== socket.AF_INET6 else 2)
return 0 if ai[0] == socket.AF_INET else \
1 if ai[0] == socket.AF_INET6 else 2
ai_faml, ai_type, ai_prot, _ai_cnam, ai_addr = min(info, key=key)
self.socket = socket.socket(ai_faml, ai_type, ai_prot)
@ -753,10 +752,9 @@ class PacketReactor(object):
decompressor = zlib.decompressobj()
decompressed_packet = decompressor.decompress(
packet_data.read())
assert len(decompressed_packet) == decompressed_size, (
"decompressed length %d, but expected %d"
% (len(decompressed_packet), decompressed_size)
)
assert len(decompressed_packet) == decompressed_size, \
'decompressed length %d, but expected %d' % \
(len(decompressed_packet), decompressed_size)
packet_data.reset()
packet_data.send(decompressed_packet)
packet_data.reset_cursor()