1- from fastapi import FastAPI
1+ import uuid
2+ from typing import Union
3+
4+ import jwt
5+ from fastapi import Depends , FastAPI , Header
26from fastapi .middleware .cors import CORSMiddleware
37from mangum import Mangum
8+ from starlette import status
9+
10+ from config import Config
11+ from models import Task
12+ from schemas import APITask , APITaskList , CreateTask , CloseTask
13+ from store import TaskStore
414
515app = FastAPI ()
16+
617app .add_middleware (
718 CORSMiddleware ,
819 allow_origins = "*" ,
920 allow_credentials = True ,
1021 allow_methods = ["*" ],
1122 allow_headers = ["*" ],
1223)
24+ config = Config ()
25+
26+
27+ def get_task_store () -> TaskStore :
28+ return TaskStore (config .TABLE_NAME , dynamodb_url = config .DYNAMODB_URL )
29+
30+
31+ def get_user_email (authorization : Union [str , None ] = Header (default = None )) -> str :
32+ return jwt .decode (authorization , options = {"verify_signature" : False })[
33+ "cognito:username"
34+ ]
1335
1436
1537@app .get ("/api/health-check/" )
1638def health_check ():
1739 return {"message" : "OK" }
1840
1941
42+ @app .post (
43+ "/api/create-task" , response_model = APITask , status_code = status .HTTP_201_CREATED
44+ )
45+ def create_task (
46+ parameters : CreateTask ,
47+ user_email : str = Depends (get_user_email ),
48+ task_store : TaskStore = Depends (get_task_store ),
49+ ):
50+ task = Task .create (id_ = uuid .uuid4 (), title = parameters .title , owner = user_email )
51+ task_store .add (task )
52+
53+ return task
54+
55+
56+ @app .get ("/api/open-tasks" , response_model = APITaskList )
57+ def open_tasks (
58+ user_email : str = Depends (get_user_email ),
59+ task_store : TaskStore = Depends (get_task_store ),
60+ ):
61+ results = task_store .list_open (owner = user_email )
62+ print ("Results: " , results )
63+ return APITaskList (results = results )
64+
65+
2066handle = Mangum (app )
67+
68+
69+ @app .post ("/api/close-task" , response_model = APITask )
70+ def close_task (
71+ parameters : CloseTask ,
72+ user_email : str = Depends (get_user_email ),
73+ task_store : TaskStore = Depends (get_task_store ),
74+ ):
75+ task = task_store .get_by_id (task_id = parameters .id , owner = user_email )
76+ task .close ()
77+ task_store .add (task )
78+
79+ return task
80+
81+
82+ @app .get ("/api/closed-tasks" , response_model = APITaskList )
83+ def closed_tasks (
84+ user_email : str = Depends (get_user_email ),
85+ task_store : TaskStore = Depends (get_task_store ),
86+ ):
87+ return APITaskList (results = task_store .list_closed (owner = user_email ))
0 commit comments