Top

youtube module

Module handling communications with the Youtube API and the formatting of received of data

"""
Module handling communications with the Youtube API and the formatting of received of data
"""

from __future__ import print_function
import os
import pickle

from dateutil.parser import *
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow

CLIENT_SECRETS_FILE = "client_secret.json"
SCOPES = ['https://www.googleapis.com/auth/youtube.readonly']
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'

client = None


# Getting credentials and storing them
def init():
    """
    Initializes Youtube API client with credentials
    """
    if not os.path.exists('credentials.dat'):
        flow = InstalledAppFlow.from_client_secrets_file(
            CLIENT_SECRETS_FILE, SCOPES)
        credentials = flow.run_local_server(host='localhost',
                                            port=8080,
                                            authorization_prompt_message='Please visit this URL: {url}',
                                            success_message='The auth flow is complete; you may close this window.',
                                            open_browser=True)
        with open('credentials.dat', 'wb') as credentials_dat:
            pickle.dump(credentials, credentials_dat)
    else:
        with open('credentials.dat', 'rb') as credentials_dat:
            credentials = pickle.load(credentials_dat)
    if credentials.expired:
        print("Refreshing youtube API tokens...")
        credentials.refresh(Request())
    global client
    client = build(API_SERVICE_NAME, API_VERSION,
                   credentials=credentials)


# =====================================
#     Custom Youtube api requests
# =====================================

def _playlistitems_list(id, token=None, maxResults=50):
    """

    Youtube API_ endpoint.

    .._API: https://developers.google.com/youtube/v3/docs/playlistItems/list

    :param id: playlist id
    :type id: str
    :param token: youtube API nextPageToken
    :type token: str
    :param maxResults: youtube API maxResults
    :type maxResults: int
    :return: playlistItems json
    """
    return client.playlistItems().list(
        part='snippet',
        maxResults=maxResults,
        playlistId=id,
        pageToken=token
    ).execute()


def _channel_related_playlists(id=None, mine=None):
    """

    Filtered Youtube API_ endpoint.

    .._API: https://developers.google.com/youtube/v3/docs/channels/list

    :param id: channel id
    :type id: str
    :param mine: whether it is the authenticated users channel
    :type mine: bool
    :return: json of related playlists
    """
    return client.channels().list(
        part='contentDetails',
        id=id,
        mine=mine
    ).execute()['items'][0]['contentDetails']['relatedPlaylists']


def _playlists_list(token=None, mine=None, id=None):
    """

    Youtube API_ endpoint.

    .._API: https://developers.google.com/youtube/v3/docs/playlists/list

    :param id: playlist id
    :type id: str
    :param token: youtube API nextPageToken
    :type token: str
    :param mine: whether it is the authenticated users channel
    :type mine: bool
    :return: playlist json
    """
    return client.playlists().list(
        part='snippet',
        mine=mine,
        maxResults=50,
        pageToken=token,
        id=id
    ).execute()


def get_my_playlists():
    """
    Gets current authenticated users playlists
    :return: List of playlists in the form of dict{
    'title':title,
    'id':id
    }
    """

    def get_items():
        return [{
            'title': item['snippet']['title'],
            'id': item['id']
        } for item in response['items']]

    response = _playlists_list(mine=True)
    token = response.get('nextPageToken')
    playlists = get_items()
    while token:
        response = _playlists_list(token, True)
        token = response.get('nextPageToken')
        playlists.extend(get_items())
    return playlists


def get_playlist_items(id, all=False, maxResults=50):
    """
    Gets the items of a playlist
    :return: List of items in the form of dict{
    'title':title,
    'id':id,
    'date':added_to_playlist_date
    }
    """

    def get_items():
        return [{
            'title': item['snippet']['title'],
            'id': item['snippet']['resourceId']['videoId'],
            'date': parse(item['snippet']['publishedAt'])
        } for item in response['items']]

    response = _playlistitems_list(id, maxResults=maxResults)
    token = response.get('nextPageToken')
    videos = get_items()
    while token and all:
        response = _playlistitems_list(id, token, maxResults)
        token = response.get('nextPageToken')
        videos.extend(get_items())
    videos.sort(key=lambda dict: dict['date'], reverse=True)
    return videos


def get_playlist(id):
    """
    Gets a playlist by id
    :return: dict{
    'title':title,
    'id':id
    }
    """
    item = _playlists_list(id=id)['items']
    return {
        'title': item[0]['snippet']['title'],
        'id': item[0]['id']
    }


def get_uploads_playlist(id):
    """
    Gets current authenticated users upload playlist
    :return: dict
    """
    upload_id = _channel_related_playlists(id)['uploads']
    return get_playlist(upload_id)

Module variables

var API_SERVICE_NAME

var API_VERSION

var CLIENT_SECRETS_FILE

var SCOPES

var client

Functions

def get_my_playlists(

)

Gets current authenticated users playlists :return: List of playlists in the form of dict{ 'title':title, 'id':id }

def get_my_playlists():
    """
    Gets current authenticated users playlists
    :return: List of playlists in the form of dict{
    'title':title,
    'id':id
    }
    """

    def get_items():
        return [{
            'title': item['snippet']['title'],
            'id': item['id']
        } for item in response['items']]

    response = _playlists_list(mine=True)
    token = response.get('nextPageToken')
    playlists = get_items()
    while token:
        response = _playlists_list(token, True)
        token = response.get('nextPageToken')
        playlists.extend(get_items())
    return playlists

def get_playlist(

id)

Gets a playlist by id :return: dict{ 'title':title, 'id':id }

def get_playlist(id):
    """
    Gets a playlist by id
    :return: dict{
    'title':title,
    'id':id
    }
    """
    item = _playlists_list(id=id)['items']
    return {
        'title': item[0]['snippet']['title'],
        'id': item[0]['id']
    }

def get_playlist_items(

id, all=False, maxResults=50)

Gets the items of a playlist :return: List of items in the form of dict{ 'title':title, 'id':id, 'date':added_to_playlist_date }

def get_playlist_items(id, all=False, maxResults=50):
    """
    Gets the items of a playlist
    :return: List of items in the form of dict{
    'title':title,
    'id':id,
    'date':added_to_playlist_date
    }
    """

    def get_items():
        return [{
            'title': item['snippet']['title'],
            'id': item['snippet']['resourceId']['videoId'],
            'date': parse(item['snippet']['publishedAt'])
        } for item in response['items']]

    response = _playlistitems_list(id, maxResults=maxResults)
    token = response.get('nextPageToken')
    videos = get_items()
    while token and all:
        response = _playlistitems_list(id, token, maxResults)
        token = response.get('nextPageToken')
        videos.extend(get_items())
    videos.sort(key=lambda dict: dict['date'], reverse=True)
    return videos

def get_uploads_playlist(

id)

Gets current authenticated users upload playlist :return: dict

def get_uploads_playlist(id):
    """
    Gets current authenticated users upload playlist
    :return: dict
    """
    upload_id = _channel_related_playlists(id)['uploads']
    return get_playlist(upload_id)

def init(

)

Initializes Youtube API client with credentials

def init():
    """
    Initializes Youtube API client with credentials
    """
    if not os.path.exists('credentials.dat'):
        flow = InstalledAppFlow.from_client_secrets_file(
            CLIENT_SECRETS_FILE, SCOPES)
        credentials = flow.run_local_server(host='localhost',
                                            port=8080,
                                            authorization_prompt_message='Please visit this URL: {url}',
                                            success_message='The auth flow is complete; you may close this window.',
                                            open_browser=True)
        with open('credentials.dat', 'wb') as credentials_dat:
            pickle.dump(credentials, credentials_dat)
    else:
        with open('credentials.dat', 'rb') as credentials_dat:
            credentials = pickle.load(credentials_dat)
    if credentials.expired:
        print("Refreshing youtube API tokens...")
        credentials.refresh(Request())
    global client
    client = build(API_SERVICE_NAME, API_VERSION,
                   credentials=credentials)