Update support

Add: Microsoft Account Support
Add: Login status persistence (Microsoft account)
This commit is contained in:
MemoryShadow 2022-07-30 12:38:43 +08:00
parent bcd156e83b
commit 71f79ba87e
2 changed files with 274 additions and 13 deletions

View File

@ -1,6 +1,7 @@
import requests
import json
import uuid
import os
from .exceptions import YggdrasilError
#: The base url for Ygdrassil requests
@ -264,6 +265,247 @@ class AuthenticationToken(object):
_raise_from_response(res)
return True
class Microsoft_AuthenticationToken(object):
"""
Represents an authentication token.
See https://wiki.vg/Microsoft_Authentication_Scheme.
"""
UserLoginURL = "https://login.live.com/oauth20_authorize.srf?client_id=00000000402b5328&response_type=code\
&scope=service%3A%3Auser.auth.xboxlive.com%3A%3AMBI_SSL&redirect_uri=https%3A%2F%2Flogin.live.com%2Foauth20_desktop.srf"
oauth20_URL = 'https://login.live.com/oauth20_token.srf'
XBL_URL = 'https://user.auth.xboxlive.com/user/authenticate'
XSTS_URL = 'https://xsts.auth.xboxlive.com/xsts/authorize'
LOGIN_WITH_XBOX_URL = 'https://api.minecraftservices.com/authentication/login_with_xbox'
CheckAccount_URL = 'https://api.minecraftservices.com/entitlements/mcstore'
Profile_URL = 'https://api.minecraftservices.com/minecraft/profile'
jwt_Token=''
def __init__(self, access_token=None):
self.access_token = access_token
self.profile = Profile()
def GetoAuth20(self, code: str='') -> object:
if code == '':
print("Please copy this link to your browser to open: \n%s" % self.UserLoginURL)
code = input("After logging in, paste the 'code' field in your browser's address bar here:")
oauth20 = requests.post(self.oauth20_URL, data={
"client_id":"00000000402b5328",
"code":f"{code}",
"grant_type":"authorization_code",
"redirect_uri":"https://login.live.com/oauth20_desktop.srf",
"scope":"service::user.auth.xboxlive.com::MBI_SSL"
},
headers={"content-type": "application/x-www-form-urlencoded"},
timeout=15
)
oauth20 = json.loads(oauth20.text)
if 'error' in oauth20:
print("Error: %s" % oauth20["error"])
return 1
else:
self.oauth20_access_token=oauth20['access_token']
self.oauth20_refresh_token=oauth20['refresh_token']
oauth20_access_token=oauth20['access_token']
oauth20_refresh_token=oauth20['refresh_token']
return {"access_token":oauth20_access_token,"refresh_token":oauth20_refresh_token}
def GetXBL(self, access_token: str) -> object:
XBL = requests.post(self.XBL_URL,
json={"Properties": {"AuthMethod": "RPS","SiteName": "user.auth.xboxlive.com","RpsTicket": f"{access_token}"},
"RelyingParty": "http://auth.xboxlive.com","TokenType": "JWT"},
headers=HEADERS, timeout=15
)
return {
"Token": json.loads(XBL.text)['Token'],
"uhs": json.loads(XBL.text)['DisplayClaims']['xui'][0]['uhs']
}
def GetXSTS(self, access_token: str) -> object:
XBL = requests.post(self.XSTS_URL,
json={
"Properties": {"SandboxId": "RETAIL","UserTokens": [f"{access_token}"]},
"RelyingParty": "rp://api.minecraftservices.com/","TokenType": "JWT"
},
headers=HEADERS, timeout=15
)
return {
"Token": json.loads(XBL.text)['Token'],
"uhs": json.loads(XBL.text)['DisplayClaims']['xui'][0]['uhs']
}
def GetXBOX(self, access_token: str,uhs: str) -> str:
mat_jwt = requests.post(self.LOGIN_WITH_XBOX_URL, json={"identityToken": f"XBL3.0 x={uhs};{access_token}"},
headers=HEADERS, timeout=15)
self.access_token = json.loads(mat_jwt.text)['access_token']
return self.access_token
def CheckAccount(self, jwt_Token: str) -> bool:
CheckAccount = requests.get(self.CheckAccount_URL, headers={"Authorization": f"Bearer {jwt_Token}"},
timeout=15)
CheckAccount = len(json.loads(CheckAccount.text)['items'])
if CheckAccount != 0:
return True
else:
return False
def GetProfile(self, access_token: str) -> object:
if self.CheckAccount(access_token):
Profile = requests.get(self.Profile_URL, headers={"Authorization": f"Bearer {access_token}"},
timeout=15)
Profile = json.loads(Profile.text)
if 'error' in Profile:
return False
self.profile.id_ = Profile["id"]
self.profile.name = Profile["name"]
self.username = Profile["name"]
return True
else:
return False
@property
def authenticated(self):
"""
Attribute which is ``True`` when the token is authenticated and
``False`` when it isn't.
"""
if not self.username:
return False
if not self.access_token:
return False
if not self.oauth20_refresh_token:
return False
if not self.profile:
return False
return True
def authenticate(self):
"Get verification information for a Microsoft account"
oauth20 = self.GetoAuth20()
XBL = self.GetXBL(oauth20['access_token'])
XSTS = self.GetXSTS(XBL['Token'])
XBOX = self.GetXBOX(XSTS['Token'],XSTS['uhs'])
if self.GetProfile(XBOX):
print(f'GameID: {self.profile.id_}')
self.PersistenceLogoin_w()
return True
else:
print('Account does not exist')
return False
def refresh(self):
"""
Refreshes the `AuthenticationToken`. Used to keep a user logged in
between sessions and is preferred over storing a user's password in a
file.
Returns:
Returns `True` if `AuthenticationToken` was successfully refreshed.
Otherwise it raises an exception.
Raises:
minecraft.exceptions.YggdrasilError
ValueError - if `AuthenticationToken.access_token` or
`AuthenticationToken.client_token` isn't set.
"""
if self.access_token is None:
raise ValueError("'access_token' not set!'")
if self.oauth20_refresh_token is None:
raise ValueError("'oauth20_refresh_token' is not set!")
oauth20 = requests.post(self.oauth20_URL,data={
"client_id":"00000000402b5328",
"refresh_token":f"{self.oauth20_refresh_token}",
"grant_type":"refresh_token",
"redirect_uri":"https://login.live.com/oauth20_desktop.srf",
"scope":"service::user.auth.xboxlive.com::MBI_SSL"
},
headers={"content-type": "application/x-www-form-urlencoded"},
timeout=15
)
oauth20 = json.loads(oauth20.text)
if 'error' in oauth20:
print("Error: %s" % oauth20["error"])
return False
else:
self.oauth20_access_token=oauth20['access_token']
self.oauth20_refresh_token=oauth20['refresh_token']
XBL = self.GetXBL(self.oauth20_access_token)
XSTS = self.GetXSTS(XBL['Token'])
XBOX = self.GetXBOX(XSTS['Token'],XSTS['uhs'])
if self.GetProfile(XBOX):
print(f'账户: {self.profile.id_}')
return True
else:
print('账户不存在')
return False
def join(self, server_id):
"""
Informs the Mojang session-server that we're joining the
MineCraft server with id ``server_id``.
Parameters:
server_id - ``str`` with the server id
Returns:
``True`` if no errors occured
Raises:
:class:`minecraft.exceptions.YggdrasilError`
"""
if not self.authenticated:
err = "AuthenticationToken hasn't been authenticated yet!"
raise YggdrasilError(err)
res = _make_request(SESSION_SERVER, "join",
{"accessToken": self.access_token,
"selectedProfile": self.profile.to_dict(),
"serverId": server_id})
if res.status_code != 204:
_raise_from_response(res)
return True
def PersistenceLogoin_w(self):
"Save access token persistent login"
if not self.authenticated:
err = "AuthenticationToken hasn't been authenticated yet!"
raise YggdrasilError(err)
if not os.path.exists("Persistence"):
os.mkdir("Persistence")
"Save access_token and oauth20_refresh_token"
with open(f"Persistence/{self.username}", mode='w', encoding='utf-8') as file_obj:
file_obj.write(f'{{"access_token": "{self.access_token}","oauth20_refresh_token": "{self.oauth20_refresh_token}"}}')
file_obj.close()
return True
def PersistenceLogoin_r(self, GameID: str):
"Load access token persistent login"
if not os.path.exists("Persistence"):
return False
"Load access_token and oauth20_refresh_token"
if os.path.isfile(f"Persistence/{GameID}"):
with open(f"Persistence/{GameID}", mode='r', encoding='utf-8') as file_obj:
Persistence = file_obj.read()
file_obj.close()
Persistence = json.loads(Persistence)
self.access_token = Persistence["access_token"]
self.oauth20_refresh_token = Persistence["oauth20_refresh_token"]
self.GetProfile(self.access_token)
return self.authenticated
else:
return False
def _make_request(server, endpoint, data):
"""

View File

@ -14,8 +14,12 @@ from minecraft.networking.packets import Packet, clientbound, serverbound
def get_options():
parser = OptionParser()
parser.add_option("-a", "--authentication-method", dest="auth",
default="microsoft",
help="what to use for authentication, allowed values are: microsoft, mojang")
parser.add_option("-u", "--username", dest="username", default=None,
help="username to log in with")
help="User name used for login, if AUTH is microsoft and a persistent archive is detected locally, the persistent login information will be read first")
parser.add_option("-p", "--password", dest="password", default=None,
help="password to log in with")
@ -38,13 +42,15 @@ def get_options():
(options, args) = parser.parse_args()
if not options.username:
options.username = input("Enter your username: ")
if options.auth == 'mojang':
if not options.password and not options.offline:
options.password = getpass.getpass("Enter your password (leave "
"blank for offline mode): ")
options.offline = options.offline or (options.password == "")
if not options.username:
options.username = input("Enter your username: ")
if not options.password and not options.offline:
options.password = getpass.getpass("Enter your password (leave "
"blank for offline mode): ")
options.offline = options.offline or (options.password == "")
if not options.server:
options.server = input("Enter server host or host:port "
@ -68,12 +74,25 @@ def main():
connection = Connection(
options.address, options.port, username=options.username)
else:
auth_token = authentication.AuthenticationToken()
try:
auth_token.authenticate(options.username, options.password)
except YggdrasilError as e:
print(e)
sys.exit()
if options.auth == "mojang":
auth_token = authentication.AuthenticationToken()
try:
auth_token.authenticate(options.username, options.password)
except YggdrasilError as e:
print(e)
sys.exit()
elif options.auth == "microsoft":
auth_token = authentication.Microsoft_AuthenticationToken()
try:
if options.username:
if not auth_token.PersistenceLogoin_r(options.username):
print(f"登陆 {options.username} 失败")
else:
auth_token.authenticate()
except YggdrasilError as e:
print(e)
sys.exit()
print("Logged in as %s..." % auth_token.username)
connection = Connection(
options.address, options.port, auth_token=auth_token)