- don't crash on mouse events - load the correct project when a task is multihomed
132 lines
3.4 KiB
Python
Executable File
132 lines
3.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import os
|
|
|
|
import asana
|
|
from asana.session import AsanaOAuth2Session
|
|
import urwid
|
|
|
|
import secrets
|
|
from ui.auth import AuthPrompt
|
|
from ui.constants import palette
|
|
from ui.ui import Ui, loading
|
|
|
|
from asana_service import AsanaService
|
|
|
|
class NotAuthedException(Exception):
|
|
def __init__(self):
|
|
super(NotAuthedException, self)
|
|
|
|
class CmdAsana(object):
|
|
loop = None
|
|
nav_stack = []
|
|
|
|
"""Try to get an Asana client using stored tokens
|
|
|
|
Raises:
|
|
NotAuthedException: the user has not authorized the app
|
|
"""
|
|
def get_client(self):
|
|
try:
|
|
f = open('.oauth', 'r')
|
|
auth_json = f.read()
|
|
f.close()
|
|
token = json.loads(auth_json)
|
|
self.client = asana.Client.oauth(
|
|
client_id=secrets.CLIENT_ID,
|
|
client_secret=secrets.CLIENT_SECRET,
|
|
token=token,
|
|
token_updater=self.save_token,
|
|
auto_refresh_url=AsanaOAuth2Session.token_url,
|
|
auto_refresh_kwargs={
|
|
'client_id': secrets.CLIENT_ID,
|
|
'client_secret': secrets.CLIENT_SECRET
|
|
}
|
|
)
|
|
except IOError:
|
|
raise NotAuthedException()
|
|
|
|
def authorize(self):
|
|
self.client = asana.Client.oauth(
|
|
client_id=secrets.CLIENT_ID,
|
|
client_secret=secrets.CLIENT_SECRET,
|
|
redirect_uri='urn:ietf:wg:oauth:2.0:oob',
|
|
token_updater=self.save_token,
|
|
auto_refresh_url=AsanaOAuth2Session.token_url,
|
|
auto_refresh_kwargs={
|
|
'client_id': secrets.CLIENT_ID,
|
|
'client_secret': secrets.CLIENT_SECRET
|
|
}
|
|
)
|
|
(url, _) = self.client.session.authorization_url()
|
|
auth = AuthPrompt(url, self.auth_callback)
|
|
|
|
try:
|
|
import webbrowser
|
|
webbrowser.open(url)
|
|
except Exception:
|
|
pass
|
|
|
|
self.loop = urwid.MainLoop(
|
|
auth.component(),
|
|
unhandled_input=self.exit_handler,
|
|
palette=palette
|
|
)
|
|
self.loop.run()
|
|
|
|
def auth_callback(self, code):
|
|
self.save_token(
|
|
self.client.session.fetch_token(code=code))
|
|
raise urwid.ExitMainLoop()
|
|
|
|
|
|
def exit_handler(self, key):
|
|
if key in ('q', 'Q', 'esc'):
|
|
raise urwid.ExitMainLoop()
|
|
if key == 'backspace':
|
|
self.ui.go_back()
|
|
|
|
def get_asana_service(self):
|
|
self.asana_service = AsanaService(self.client)
|
|
|
|
def get_ui(self):
|
|
self.ui = Ui(self.asana_service, self.update)
|
|
|
|
def run(self):
|
|
self.placeholder = urwid.WidgetPlaceholder(loading())
|
|
self.loop = urwid.MainLoop(
|
|
self.placeholder,
|
|
unhandled_input=self.exit_handler,
|
|
palette=palette
|
|
)
|
|
self.ui.my_tasks()
|
|
|
|
def update(self, widget):
|
|
self.loop.widget.original_widget = widget
|
|
try:
|
|
self.loop.draw_screen()
|
|
except Exception:
|
|
self.loop.run()
|
|
|
|
def save_token(self, token):
|
|
f = open('.oauth', 'w')
|
|
f.write(json.dumps(token))
|
|
f.close()
|
|
os.chmod('.oauth', 0o600)
|
|
|
|
def main():
|
|
cmd_asana = CmdAsana()
|
|
try:
|
|
cmd_asana.get_client()
|
|
except NotAuthedException:
|
|
cmd_asana.authorize()
|
|
|
|
cmd_asana.get_asana_service()
|
|
cmd_asana.get_ui()
|
|
|
|
cmd_asana.run()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|