Skip to content
Snippets Groups Projects
migrate_users.py 2.86 KiB
Newer Older
  • Learn to ignore specific revisions
  • Dan Braghis's avatar
    Dan Braghis committed
    import argparse
    import json
    
    
    Dan Braghis's avatar
    Dan Braghis committed
    from django.conf import settings
    
    Dan Braghis's avatar
    Dan Braghis committed
    from django.contrib.auth import get_user_model
    
    Dan Braghis's avatar
    Dan Braghis committed
    from django.contrib.auth.models import Group
    
    Dan Braghis's avatar
    Dan Braghis committed
    from django.core.management.base import BaseCommand
    
    from django.db import transaction
    
    Dan Braghis's avatar
    Dan Braghis committed
    from opentech.apply.users.groups import STAFF_GROUP_NAME
    
    
    Dan Braghis's avatar
    Dan Braghis committed
    
    class Command(BaseCommand):
        help = "User migration script. Requires a source JSON file."
    
    Dan Braghis's avatar
    Dan Braghis committed
        groups = Group.objects.all()
    
    Dan Braghis's avatar
    Dan Braghis committed
    
        def add_arguments(self, parser):
    
            parser.add_argument('source', type=argparse.FileType('r'), help="Migration source JSON file")
            parser.add_argument('--anonymize', action='store_true', help="Anonymizes non-OTF emails")
    
        @transaction.atomic
    
    Dan Braghis's avatar
    Dan Braghis committed
        def handle(self, *args, **options):
            with options['source'] as json_data:
                User = get_user_model()
                users = json.load(json_data)
    
    
                for uid in users:
                    user = users[uid]
    
    
    Dan Braghis's avatar
    Dan Braghis committed
                    full_name = self.get_full_name(user)
                    user_object, created = User.objects.get_or_create(
    
                        email=self.get_email(user, options['anonymize']),
    
    Dan Braghis's avatar
    Dan Braghis committed
                        defaults={
                            'full_name': full_name,
                            'drupal_id': uid,
                        }
    
    Dan Braghis's avatar
    Dan Braghis committed
                    operation = "Imported" if created else "Processed"
    
    Dan Braghis's avatar
    Dan Braghis committed
    
    
    Dan Braghis's avatar
    Dan Braghis committed
                    groups = self.get_user_groups(user)
                    user_object.groups.set(groups)
    
    Dan Braghis's avatar
    Dan Braghis committed
    
                    # Ensure uid is set
                    user_object.drupal_id = uid
    
    Dan Braghis's avatar
    Dan Braghis committed
                    user_object.save()
    
    
    Dan Braghis's avatar
    Dan Braghis committed
                    self.stdout.write(f"{operation} user {uid} ({full_name})")
    
    Dan Braghis's avatar
    Dan Braghis committed
        def get_full_name(self, user):
            full_name = user.get('field_otf_real_name', None)
            try:
    
    Dan Braghis's avatar
    Dan Braghis committed
                # The Drupal data structure includes a language reference.
                # The default is 'und' (undefined).
    
    Dan Braghis's avatar
    Dan Braghis committed
                full_name = full_name['und'][0]['safe_value']
            except (KeyError, TypeError):
    
    Dan Braghis's avatar
    Dan Braghis committed
                full_name = user['name']
    
    Dan Braghis's avatar
    Dan Braghis committed
    
            return full_name
    
        def get_user_groups(self, user):
            groups = []
            role_map = {
    
                'council': 'Reviewer',
                'administrator': 'Editors',
    
    Dan Braghis's avatar
    Dan Braghis committed
            }
    
    
    Dan Braghis's avatar
    Dan Braghis committed
            if self.is_staff(user['mail']):
    
    Dan Braghis's avatar
    Dan Braghis committed
                groups.append(self.groups.filter(name=STAFF_GROUP_NAME).first())
    
            roles = [role for role in user.get('roles').values() if role != "authenticated user"]
    
            for role in roles:
                group_name = role_map.get(role)
                if group_name:
                    groups.append(self.groups.filter(name=group_name).first())
    
            return groups
    
    
        def get_email(self, user, anonymize=False):
    
    Dan Braghis's avatar
    Dan Braghis committed
            email = user['mail']
            if not anonymize or self.is_staff(email):
                return email
    
    Dan Braghis's avatar
    Dan Braghis committed
            return f"aeon+{user['uid']}@torchbox.com"
    
    Dan Braghis's avatar
    Dan Braghis committed
    
        def is_staff(self, email):
            _, email_domain = email.split('@')
            return email_domain in settings.STAFF_EMAIL_DOMAINS