Source code for arbor_imago.services.api_key

from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
import datetime as datetime_module
from typing import cast

from arbor_imago import custom_types, core_utils
from arbor_imago.models.tables import ApiKey as ApiKeyTable
from arbor_imago.schemas import api_key as api_key_schema, auth_credential as auth_credential_schema
from arbor_imago.services import auth_credential as auth_credential_service, base


[docs] class ApiKey( base.Service[ ApiKeyTable, custom_types.ApiKey.id, api_key_schema.ApiKeyAdminCreate, api_key_schema.ApiKeyAdminUpdate, custom_types.ApiKey.order_by ], base.SimpleIdModelService[ApiKeyTable, custom_types.ApiKey.id], auth_credential_service.JwtIO[ApiKeyTable, custom_types.ApiKey.id], auth_credential_service.Table[ApiKeyTable], auth_credential_service.JwtAndSimpleIdTable[ApiKeyTable, custom_types.ApiKey.id], ): auth_type = auth_credential_schema.Type.API_KEY _MODEL = ApiKeyTable
[docs] @classmethod def model_inst_from_create_model(cls, create_model): return cls._MODEL( id=custom_types.ApiKey.id(core_utils.generate_uuid()), issued=datetime_module.datetime.now().astimezone(datetime_module.UTC), **create_model.model_dump() )
[docs] @classmethod async def get_scope_ids(cls, session, inst): return [api_key_scope.scope_id for api_key_scope in inst.api_key_scopes]
[docs] @classmethod async def is_available(cls, session: AsyncSession, api_key_available_admin: api_key_schema.ApiKeyAdminAvailable) -> bool: return (await session.exec(select(cls).where( api_key_schema.ApiKeyAdminAvailable.name == api_key_available_admin.name, api_key_schema.ApiKeyAdminAvailable.user_id == api_key_available_admin.user_id ))).one_or_none() is not None
@classmethod async def _check_authorization_new(cls, params): if not params['admin']: if params['authorized_user_id'] != params['create_model'].user_id: raise base.UnauthorizedError( 'Unauthorized to post API Key for another user' ) @classmethod async def _check_authorization_existing(cls, params): if not params['admin']: if params['model_inst'].user_id != params['authorized_user_id']: raise base.NotFoundError( ApiKeyTable, params['id'] ) @classmethod async def _check_validation_post(cls, params): if not await cls.is_available(params['session'], api_key_schema.ApiKeyAdminAvailable( **params['create_model'].model_dump(exclude_unset=True), user_id=cast(custom_types.User.id, params['authorized_user_id']), )): raise base.NotAvailableError( 'Cannot create API Key {} for user {}, not available'.format( str(params['create_model']), params['authorized_user_id'] ) ) @classmethod async def _check_validation_patch(cls, params): if 'name' in params['update_model'].model_fields_set: if not await cls.is_available(params['session'], api_key_schema.ApiKeyAdminAvailable( name=cast(custom_types.ApiKey.name, params['update_model'].name), user_id=cast(custom_types.User.id, params['authorized_user_id']))): raise base.NotAvailableError( 'Cannot update API Key {} for user {}, not available'.format( str(params['update_model'] ), params['authorized_user_id'] ) )