Skip to content
Snippets Groups Projects
migrate_users.py 3.18 KiB
Newer Older
Dan Braghis's avatar
Dan Braghis committed
import argparse
import json

Dan Braghis's avatar
Dan Braghis committed
from django.conf import settings
Dan Braghis's avatar
Dan Braghis committed
from django.contrib.auth import get_user_model
Dan Braghis's avatar
Dan Braghis committed
from django.contrib.auth.models import Group
Dan Braghis's avatar
Dan Braghis committed
from django.core.management.base import BaseCommand
from django.db import transaction
Dan Braghis's avatar
Dan Braghis committed
from opentech.apply.users.groups import STAFF_GROUP_NAME

Dan Braghis's avatar
Dan Braghis committed

class Command(BaseCommand):
    help = "User migration script. Requires a source JSON file."
Dan Braghis's avatar
Dan Braghis committed
    groups = Group.objects.all()
Dan Braghis's avatar
Dan Braghis committed

    def add_arguments(self, parser):
        parser.add_argument('source', type=argparse.FileType('r'), help="Migration source JSON file")
        parser.add_argument('--anonymize', action='store_true', help="Anonymizes non-OTF emails")
    @transaction.atomic
Dan Braghis's avatar
Dan Braghis committed
    def handle(self, *args, **options):
        with options['source'] as json_data:
            User = get_user_model()
            users = json.load(json_data)

            for uid in users:
                user = users[uid]

Dan Braghis's avatar
Dan Braghis committed
                full_name = self.get_full_name(user)
                slack_name = self.get_slack_name(user)
Dan Braghis's avatar
Dan Braghis committed
                user_object, created = User.objects.get_or_create(
                    email=self.get_email(user, options['anonymize']),
Dan Braghis's avatar
Dan Braghis committed
                    defaults={
                        'full_name': full_name,
                        'slack': slack_name,
Dan Braghis's avatar
Dan Braghis committed
                        'drupal_id': uid,
                    }
Dan Braghis's avatar
Dan Braghis committed
                operation = "Imported" if created else "Processed"
Dan Braghis's avatar
Dan Braghis committed

Dan Braghis's avatar
Dan Braghis committed
                groups = self.get_user_groups(user)
                user_object.groups.set(groups)
                # Allow for updating the slack user name.
                if not created:
                    user_object.slack = slack_name

Dan Braghis's avatar
Dan Braghis committed
                # Ensure uid is set
                user_object.drupal_id = uid
Dan Braghis's avatar
Dan Braghis committed
                user_object.save()

Dan Braghis's avatar
Dan Braghis committed
                self.stdout.write(f"{operation} user {uid} ({full_name})")
Dan Braghis's avatar
Dan Braghis committed
    def get_full_name(self, user):
        full_name = user.get('field_otf_real_name', None)
        try:
            full_name = full_name['safe_value']
Dan Braghis's avatar
Dan Braghis committed
        except (KeyError, TypeError):
Dan Braghis's avatar
Dan Braghis committed
            full_name = user['name']
Dan Braghis's avatar
Dan Braghis committed

        return full_name

    def get_slack_name(self, user):
        slack_name = user.get('field_otf_slack_name', None)
        try:
            slack_name = f"@{slack_name['safe_value']}"
        except (KeyError, TypeError):
            slack_name = ''

        return slack_name

Dan Braghis's avatar
Dan Braghis committed
    def get_user_groups(self, user):
        groups = []
        role_map = {
Dan Braghis's avatar
Dan Braghis committed
        }

Dan Braghis's avatar
Dan Braghis committed
        if self.is_staff(user['mail']):
Dan Braghis's avatar
Dan Braghis committed
            groups.append(self.groups.filter(name=STAFF_GROUP_NAME).first())

        roles = [role for role in user.get('roles').values() if role != "authenticated user"]

        for role in roles:
            group_name = role_map.get(role)
            if group_name:
                groups.append(self.groups.filter(name=group_name).first())

        return groups

    def get_email(self, user, anonymize=False):
Dan Braghis's avatar
Dan Braghis committed
        email = user['mail']
        if not anonymize or self.is_staff(email):
            return email
Dan Braghis's avatar
Dan Braghis committed
        return f"aeon+{user['uid']}@torchbox.com"
Dan Braghis's avatar
Dan Braghis committed

    def is_staff(self, email):
        _, email_domain = email.split('@')
        return email_domain in settings.STAFF_EMAIL_DOMAINS