Guides
«Kiss My Stat!» P.2 AmoCRM API + ClickHouse
Дима Родин
Сторож в Digital God
Получение данных, обработка, визуализация. Да, да ETL )
  1. Скринкаст: получение ключа API
  2. Авторизация и выполнение запросов к API
  3. Получение информации об аккаунте
  4. Получение информации о сделках
  5. Преобразование в удобный формат
  6. Создание схемы ClickHouse вручную
  7. Автоматическое построение таблицы ClickHouse на основе имеющихся данных
  8. Управление базой данных через сервис CHWriter в RockStat
  9. Построение примитивного Dashboard в Grafana на основе данных из AmoCRM, хранящихся в ClickHouse
  10. Простой способ обработки перезаписи в ClickHouse
  11. Ачивка: асинхронное программирование в Python
  12. Ачивка: создание сервиса, обновляющего данные в ClickHouse по расписанию
  13. Скринкаст: создание и запуск сервиса
    Оповещения
    Анонсы материалов руководств в канале @kissmystat. Новости и оповещения в канале @digitalgods, дополнительная
    Вопросы и общение
    Обсудить задачи из руководств можно в TG @kissmystats. Отдельный чат для мероприятий @digitalgodhub
    Сервисный бот
    Схема проезда, feedback, запросы доступа, регистрация на  мероприятия у бота Алены в TG @digitalgodbot
    В руководстве используется ClickHouse, Jupyter, Grafana и другие инструменты. Они все имеются в стандартной установке Rockstat. Если вы не читали предыдущие руководства, где произовдилась установка Rockstat, сделайте это при помощи видео-руководства установка Rockstat в Google Cloud.

    Установка зависимостей

    Для начала установим набор полезных функция для вывода данных от Дмитрия Родина

    In [1]:
    # набор полезных функций для вывода данных
    !pip install -U  --user git+https://github.com/madiedinro/rodin_helpers_py -q
    import rodin_helpers as rh
    
    In [2]:
    !pip install -U --user arrow
    !pip install -U --user python-dotenv
    
    Requirement already up-to-date: arrow in /home/user/.local/lib/python3.7/site-packages (0.15.6)
    Requirement already satisfied, skipping upgrade: python-dateutil in /opt/conda/lib/python3.7/site-packages (from arrow) (2.8.0)
    Requirement already satisfied, skipping upgrade: six>=1.5 in /opt/conda/lib/python3.7/site-packages (from python-dateutil->arrow) (1.12.0)
    Requirement already up-to-date: python-dotenv in /home/user/.local/lib/python3.7/site-packages (0.13.0)
    
    In [3]:
    # Стандатные классы даты и времени
    from datetime import datetime
    # Модуль для с методами для итеративной обработки данных
    from itertools import count
    # Вывод данные в структурированном виде
    from pprint import pprint
    # Стандартный пакет для работы с json
    import json
    
    # альтернативный пакет для работы с данными
    import arrow
    
    # библиотека для работы с http
    import requests
    

    Установка констант

    Записываем необходимые сведения в переменные:

    • amo_key в настройках AmoCRM, домен и юзера вы знаете.
    • amo_domain и amo_user вам известны

    Подготовим словарь, в котором будет храниться статус.Словарь используется т.к. из функции нелья установить переменную за ее границами. Редактировать словарь можно.

    Берем в настройках ключ API, указывает другиме данные которые нам уже известны

    In [5]:
    amo_domain = 'https://randomint.amocrm.ru'
    # логин AmoCRM
    amo_user = 'randomint@armyspy.com'
    # ключ API
    amo_key = 'ea961728df7452688bcd8fe3cf8e3cb9e8c39e11'
    
    today = arrow.now().format('YYYY-MM-DD')
    state = {
        'cookies': None
    }
    

    Поехали!

    Авторизация в AmoCRM

    Подготовим функцию, которая принмает в аргументах логин и ключ. Дока по авторизации https://www.amocrm.ru/developers/content/api/auth

    In [6]:
    def auth(user, user_hash):
        url = amo_domain + '/private/api/auth.php'
        data = {
            'USER_LOGIN': user, 
            'USER_HASH': user_hash
        }
        res = requests.post(url, data=data, params={'type':'json'})
    
        print('status code', res.status_code)
        
        if res.status_code == 200:
            state['cookies'] = res.cookies
            return res.json()
    
    auth_result = auth(amo_user, amo_key)
    pprint(auth_result)
    
    status code 200
    {
      'response': {
        'accounts': [
          {
            'id': 24126727,
            'language': 'ru',
            'name': 'randomint',
            'subdomain': 'randomint',
            'timezone': 'Europe/Moscow'
          }
        ],
        'auth': True,
        'server_time': 1592506276,
        'user': {'id': 3094597, 'language': 'ru'}
      }
    }
    

    Если 200 то все супер. Если нет - ищем ошибку

    Запрос параметра аккаунта

    Запросим параметры аккаунта, где находится информация о номерах статусов. Дока по API аккаунта https://www.amocrm.ru/developers/content/api/account

    In [7]:
    res = requests.get(amo_domain + '/api/v2/account', params={'with': 'pipelines,groups,note_types,task_types'}, cookies=state['cookies'])
    if res.status_code == 200:
        data = res.json()
    
    pprint(data)
    
    {
      '_embedded': {
        'groups': [{'id': 0, 'name': 'Отдел продаж'}],
        'note_types': {
          '1': {
            'code': 'DEAL_CREATED',
            'id': 1,
            'is_editable': False
          },
          '10': {
            'code': 'CALL_IN',
            'id': 10,
            'is_editable': False
          },
          '101': {
            'code': 'DROPBOX',
            'id': 101,
            'is_editable': False
          },
          '102': {
            'code': 'SMS_IN',
            'id': 102,
            'is_editable': False
          },
          '103': {
            'code': 'SMS_OUT',
            'id': 103,
            'is_editable': False
          },
          '11': {
            'code': 'CALL_OUT',
            'id': 11,
            'is_editable': False
          },
          '12': {
            'code': 'COMPANY_CREATED',
            'id': 12,
            'is_editable': False
          },
          '13': {
            'code': 'TASK_RESULT',
            'id': 13,
            'is_editable': False
          },
          '17': {
            'code': 'CHAT',
            'id': 17,
            'is_editable': False
          },
          '2': {
            'code': 'CONTACT_CREATED',
            'id': 2,
            'is_editable': False
          },
          '3': {
            'code': 'DEAL_STATUS_CHANGED',
            'id': 3,
            'is_editable': False
          },
          '4': {
            'code': 'COMMON',
            'id': 4,
            'is_editable': True
          },
          '5': {
            'code': 'ATTACHEMENT',
            'id': 5,
            'is_editable': False
          },
          '6': {
            'code': 'CALL',
            'id': 6,
            'is_editable': False
          },
          '7': {
            'code': 'MAIL_MESSAGE',
            'id': 7,
            'is_editable': False
          },
          '8': {
            'code': 'MAIL_MESSAGE_ATTACHMENT',
            'id': 8,
            'is_editable': False
          },
          '9': {
            'code': 'EXTERNAL_ATTACH',
            'id': 9,
            'is_editable': False
          },
          '99': {
            'code': 'MAX_SYSTEM',
            'id': 99,
            'is_editable': False
          }
        },
        'pipelines': {
          '1588552': {
            '_links': {
              'self': {'href': '/api/v2/pipelines?id=1588552', 'method': 'get'}
            },
            'id': 1588552,
            'is_main': True,
            'name': 'Воронка',
            'sort': 1,
            'statuses': {
              '142': {
                'color': '#CCFF66',
                'id': 142,
                'is_editable': False,
                'name': 'Успешно реализовано',
                'sort': 10000
              },
              '143': {
                'color': '#D5D8DB',
                'id': 143,
                'is_editable': False,
                'name': 'Закрыто и не реализовано',
                'sort': 11000
              },
              '24126733': {
                'color': '#c1c1c1',
                'id': 24126733,
                'is_editable': False,
                'name': 'Неразобранное',
                'sort': 10
              },
              '24126736': {
                'color': '#99ccff',
                'id': 24126736,
                'is_editable': True,
                'name': 'Первичный контакт',
                'sort': 20
              },
              '24126739': {
                'color': '#ffff99',
                'id': 24126739,
                'is_editable': True,
                'name': 'Переговоры',
                'sort': 30
              },
              '24126742': {
                'color': '#ffcc66',
                'id': 24126742,
                'is_editable': True,
                'name': 'Принимают решение',
                'sort': 40
              },
              '24126745': {
                'color': '#ffcccc',
                'id': 24126745,
                'is_editable': True,
                'name': 'Согласование договора',
                'sort': 50
              }
            }
          }
        },
        'task_types': {
          '1': {
            'color': None,
            'icon_id': 0,
            'id': 1,
            'name': 'Звонок'
          },
          '2': {
            'color': None,
            'icon_id': 0,
            'id': 2,
            'name': 'Встреча'
          }
        }
      },
      '_links': {
        'self': {
          'href': '/api/v2/account?with=pipelines,groups,note_types,task_types',
          'method': 'get'
        }
      },
      'currency': 'RUB',
      'current_user': 3094597,
      'date_pattern': {
        'date': 'd.m.Y',
        'date_time': 'd.m.Y H:i',
        'time': 'H:i',
        'time_full': 'H:i:s'
      },
      'id': 24126727,
      'language': 'ru',
      'name': 'randomint',
      'subdomain': 'randomint',
      'timezone': 'Europe/Moscow',
      'timezone_offset': '+03:00'
    }
    

    А теперь попрбуем воспользоваться функцией из модуля rodin_helpers, который мы загрузили в самом начале.

    In [8]:
    rh.walk(data, limit_list=1)
    
    [dict  
    [dict  _links
    [dict  _links > self
    |      _links > self > href=/api/v2/account?with=pipelines,groups,note_types,task_types
    |      _links > self > method=get
    |      id=24126727
    |      name=randomint
    |      subdomain=randomint
    |      currency=RUB
    |      timezone=Europe/Moscow
    |      timezone_offset=+03:00
    |      language=ru
    [dict  date_pattern
    |      date_pattern > date=d.m.Y
    |      date_pattern > time=H:i
    |      date_pattern > date_time=d.m.Y H:i
    |      date_pattern > time_full=H:i:s
    |      current_user=3094597
    [dict  _embedded
    [dict  _embedded > note_types
    [dict  _embedded > note_types > 1
    |      _embedded > note_types > 1 > id=1
    |      _embedded > note_types > 1 > code=DEAL_CREATED
    |      _embedded > note_types > 1 > is_editable=False
    [dict  _embedded > note_types > 2
    |      _embedded > note_types > 2 > id=2
    |      _embedded > note_types > 2 > code=CONTACT_CREATED
    |      _embedded > note_types > 2 > is_editable=False
    [dict  _embedded > note_types > 3
    |      _embedded > note_types > 3 > id=3
    |      _embedded > note_types > 3 > code=DEAL_STATUS_CHANGED
    |      _embedded > note_types > 3 > is_editable=False
    [dict  _embedded > note_types > 4
    |      _embedded > note_types > 4 > id=4
    |      _embedded > note_types > 4 > code=COMMON
    |      _embedded > note_types > 4 > is_editable=True
    [dict  _embedded > note_types > 5
    |      _embedded > note_types > 5 > id=5
    |      _embedded > note_types > 5 > code=ATTACHEMENT
    |      _embedded > note_types > 5 > is_editable=False
    [dict  _embedded > note_types > 6
    |      _embedded > note_types > 6 > id=6
    |      _embedded > note_types > 6 > code=CALL
    |      _embedded > note_types > 6 > is_editable=False
    [dict  _embedded > note_types > 7
    |      _embedded > note_types > 7 > id=7
    |      _embedded > note_types > 7 > code=MAIL_MESSAGE
    |      _embedded > note_types > 7 > is_editable=False
    [dict  _embedded > note_types > 8
    |      _embedded > note_types > 8 > id=8
    |      _embedded > note_types > 8 > code=MAIL_MESSAGE_ATTACHMENT
    |      _embedded > note_types > 8 > is_editable=False
    [dict  _embedded > note_types > 9
    |      _embedded > note_types > 9 > id=9
    |      _embedded > note_types > 9 > code=EXTERNAL_ATTACH
    |      _embedded > note_types > 9 > is_editable=False
    [dict  _embedded > note_types > 10
    |      _embedded > note_types > 10 > id=10
    |      _embedded > note_types > 10 > code=CALL_IN
    |      _embedded > note_types > 10 > is_editable=False
    [dict  _embedded > note_types > 11
    |      _embedded > note_types > 11 > id=11
    |      _embedded > note_types > 11 > code=CALL_OUT
    |      _embedded > note_types > 11 > is_editable=False
    [dict  _embedded > note_types > 12
    |      _embedded > note_types > 12 > id=12
    |      _embedded > note_types > 12 > code=COMPANY_CREATED
    |      _embedded > note_types > 12 > is_editable=False
    [dict  _embedded > note_types > 13
    |      _embedded > note_types > 13 > id=13
    |      _embedded > note_types > 13 > code=TASK_RESULT
    |      _embedded > note_types > 13 > is_editable=False
    [dict  _embedded > note_types > 17
    |      _embedded > note_types > 17 > id=17
    |      _embedded > note_types > 17 > code=CHAT
    |      _embedded > note_types > 17 > is_editable=False
    [dict  _embedded > note_types > 99
    |      _embedded > note_types > 99 > id=99
    |      _embedded > note_types > 99 > code=MAX_SYSTEM
    |      _embedded > note_types > 99 > is_editable=False
    [dict  _embedded > note_types > 101
    |      _embedded > note_types > 101 > id=101
    |      _embedded > note_types > 101 > code=DROPBOX
    |      _embedded > note_types > 101 > is_editable=False
    [dict  _embedded > note_types > 102
    |      _embedded > note_types > 102 > id=102
    |      _embedded > note_types > 102 > code=SMS_IN
    |      _embedded > note_types > 102 > is_editable=False
    [dict  _embedded > note_types > 103
    |      _embedded > note_types > 103 > id=103
    |      _embedded > note_types > 103 > code=SMS_OUT
    |      _embedded > note_types > 103 > is_editable=False
    [list  _embedded > groups
    [dict  _embedded > groups > 0
    |      _embedded > groups > 0 > id=0
    |      _embedded > groups > 0 > name=Отдел продаж
    [dict  _embedded > task_types
    [dict  _embedded > task_types > 1
    |      _embedded > task_types > 1 > id=1
    |      _embedded > task_types > 1 > name=Звонок
    |      _embedded > task_types > 1 > color=None
    |      _embedded > task_types > 1 > icon_id=0
    [dict  _embedded > task_types > 2
    |      _embedded > task_types > 2 > id=2
    |      _embedded > task_types > 2 > name=Встреча
    |      _embedded > task_types > 2 > color=None
    |      _embedded > task_types > 2 > icon_id=0
    [dict  _embedded > pipelines
    [dict  _embedded > pipelines > 1588552
    |      _embedded > pipelines > 1588552 > id=1588552
    |      _embedded > pipelines > 1588552 > name=Воронка
    |      _embedded > pipelines > 1588552 > sort=1
    |      _embedded > pipelines > 1588552 > is_main=True
    [dict  _embedded > pipelines > 1588552 > statuses
    [dict  _embedded > pipelines > 1588552 > statuses > 24126733
    |      _embedded > pipelines > 1588552 > statuses > 24126733 > id=24126733
    |      _embedded > pipelines > 1588552 > statuses > 24126733 > name=Неразобранное
    |      _embedded > pipelines > 1588552 > statuses > 24126733 > color=#c1c1c1
    |      _embedded > pipelines > 1588552 > statuses > 24126733 > sort=10
    |      _embedded > pipelines > 1588552 > statuses > 24126733 > is_editable=False
    [dict  _embedded > pipelines > 1588552 > statuses > 24126736
    |      _embedded > pipelines > 1588552 > statuses > 24126736 > id=24126736
    |      _embedded > pipelines > 1588552 > statuses > 24126736 > name=Первичный контакт
    |      _embedded > pipelines > 1588552 > statuses > 24126736 > color=#99ccff
    |      _embedded > pipelines > 1588552 > statuses > 24126736 > sort=20
    |      _embedded > pipelines > 1588552 > statuses > 24126736 > is_editable=True
    [dict  _embedded > pipelines > 1588552 > statuses > 24126739
    |      _embedded > pipelines > 1588552 > statuses > 24126739 > id=24126739
    |      _embedded > pipelines > 1588552 > statuses > 24126739 > name=Переговоры
    |      _embedded > pipelines > 1588552 > statuses > 24126739 > color=#ffff99
    |      _embedded > pipelines > 1588552 > statuses > 24126739 > sort=30
    |      _embedded > pipelines > 1588552 > statuses > 24126739 > is_editable=True
    [dict  _embedded > pipelines > 1588552 > statuses > 24126742
    |      _embedded > pipelines > 1588552 > statuses > 24126742 > id=24126742
    |      _embedded > pipelines > 1588552 > statuses > 24126742 > name=Принимают решение
    |      _embedded > pipelines > 1588552 > statuses > 24126742 > color=#ffcc66
    |      _embedded > pipelines > 1588552 > statuses > 24126742 > sort=40
    |      _embedded > pipelines > 1588552 > statuses > 24126742 > is_editable=True
    [dict  _embedded > pipelines > 1588552 > statuses > 24126745
    |      _embedded > pipelines > 1588552 > statuses > 24126745 > id=24126745
    |      _embedded > pipelines > 1588552 > statuses > 24126745 > name=Согласование договора
    |      _embedded > pipelines > 1588552 > statuses > 24126745 > color=#ffcccc
    |      _embedded > pipelines > 1588552 > statuses > 24126745 > sort=50
    |      _embedded > pipelines > 1588552 > statuses > 24126745 > is_editable=True
    [dict  _embedded > pipelines > 1588552 > statuses > 142
    |      _embedded > pipelines > 1588552 > statuses > 142 > id=142
    |      _embedded > pipelines > 1588552 > statuses > 142 > name=Успешно реализовано
    |      _embedded > pipelines > 1588552 > statuses > 142 > color=#CCFF66
    |      _embedded > pipelines > 1588552 > statuses > 142 > sort=10000
    |      _embedded > pipelines > 1588552 > statuses > 142 > is_editable=False
    [dict  _embedded > pipelines > 1588552 > statuses > 143
    |      _embedded > pipelines > 1588552 > statuses > 143 > id=143
    |      _embedded > pipelines > 1588552 > statuses > 143 > name=Закрыто и не реализовано
    |      _embedded > pipelines > 1588552 > statuses > 143 > color=#D5D8DB
    |      _embedded > pipelines > 1588552 > statuses > 143 > sort=11000
    |      _embedded > pipelines > 1588552 > statuses > 143 > is_editable=False
    [dict  _embedded > pipelines > 1588552 > _links
    [dict  _embedded > pipelines > 1588552 > _links > self
    |      _embedded > pipelines > 1588552 > _links > self > href=/api/v2/pipelines?id=1588552
    |      _embedded > pipelines > 1588552 > _links > self > method=get
    

    Не знаю как вам, а мне куда нагляднее. У меня номер статуса "Успешно реализовано" = 142

    In [9]:
    success_num = 142
    

    Запрос информации о лидах

    При запросе можно передавать, лиды с какими статусами нам нужны. Мы выберем те, что находятся на завершительном этапе воронки

    Тут скрыт, важный и интересный кусочек руководства. Чтобы открыть потребуется выполнить задание! Но сейчас не выйдет :(
    У нас трудности с API Facebook, а оно необходимо для отслеживания выполнения задания :(

    Слишком много информации. Создадим функцию которая будет превращать здоровенный объект сделки в небольшой и подходящий нам полезный объектик. Заодно избавимся от пути _embedded > items.

    In [11]:
    orig_deals = data['_embedded']['items']
    
    In [12]:
    def format_deal(deal):
        fields = {f['name'].lower(): f['values'][0].get('value') for f in deal['custom_fields']}
        date = datetime.fromtimestamp(deal['closed_at'],)
        return {
            'id': deal['id'],
            'uid': fields.get('uid') or '',
            'cid': fields.get('cid') or '',
            'sale': deal['sale'],
            'date': date.strftime('%Y-%m-%d'),
            'date_time': date.strftime('%Y-%m-%d %H:%M:%S'),
            'account_id': deal['account_id']
        }
    
    In [13]:
    deal = format_deal(orig_deals[0])
    pprint(deal)
    
    {
      'account_id': 24126727,
      'cid': '943617990.1549300918',
      'date': '2019-02-10',
      'date_time': '2019-02-10 18:47:43',
      'id': 6082997,
      'sale': 0,
      'uid': ''
    }
    

    Так гораздо лучше. Применяем ко всем.

    In [14]:
    deals = [format_deal(d) for d in orig_deals]
    

    Запись в ClickHouse. Создание таблицы

    Мы рассмотрим несколько вариантов, как это сделать.

    Вариант 1. Воспользовавшись магическими возможностями SimpleCH

    Речь идет про библиотеку для работы с ClickHouse SimpleCH, детали можно найти в репозитории с библиотекой https://github.com/madiedinro/simple-clickhouse.

    Возможности новые и экспериментальные, могут работать не во всех случаях. Но страшно удобные!

    In [15]:
    from simplech import ClickHouse
    

    Создание экземпляра класса для работы с ClickHouse

    In [16]:
    ch = ClickHouse()
    

    вызов ch.discovery создаст экземпляр TableDiscovery, где сразу будет произведена обработка имеющихся данных и подобрана схема хранения

    In [17]:
    td = ch.discover('deals', deals)
    td
    
    Out[17]:
    <Instance of TableDiscovery class, value=table=None date_field=None index_granularity=8192 columns={'id': <class 'simplech.types.Int64'>, 'uid': <class 'simplech.types.String'>, 'cid': <class 'simplech.types.String'>, 'sale': <class 'simplech.types.Int64'>, 'date': <class 'simplech.types.Date'>, 'date_time': <class 'simplech.types.DateTime'>, 'account_id': <class 'simplech.types.Int64'>} idx=None metrics_set=set() metrics={} dimensions_set={'sale', 'account_id', 'id', 'date_time', 'cid', 'uid', 'date'} dimensions={'sale': <class 'simplech.types.Int64'>, 'account_id': <class 'simplech.types.Int64'>, 'id': <class 'simplech.types.Int64'>, 'date_time': <class 'simplech.types.DateTime'>, 'cid': <class 'simplech.types.String'>, 'uid': <class 'simplech.types.String'>, 'date': <class 'simplech.types.Date'>}>

    Видно, что все поля записались в dimensions. Надо только отметить какие из них метрики и выделить дату и первичный ключ

    In [18]:
    td.date('date').idx('account_id', 'date').metrics('sale')
    
    Out[18]:
    <Instance of TableDiscovery class, value=table=None date_field='date' index_granularity=8192 columns={'id': <class 'simplech.types.Int64'>, 'uid': <class 'simplech.types.String'>, 'cid': <class 'simplech.types.String'>, 'sale': <class 'simplech.types.Int64'>, 'date': <class 'simplech.types.Date'>, 'date_time': <class 'simplech.types.DateTime'>, 'account_id': <class 'simplech.types.Int64'>} idx=['account_id', 'date'] metrics_set={'sale'} metrics={'sale': <class 'simplech.types.Int64'>} dimensions_set={'account_id', 'id', 'date_time', 'cid', 'uid', 'date'} dimensions={'account_id': <class 'simplech.types.Int64'>, 'id': <class 'simplech.types.Int64'>, 'date_time': <class 'simplech.types.DateTime'>, 'cid': <class 'simplech.types.String'>, 'uid': <class 'simplech.types.String'>, 'date': <class 'simplech.types.Date'>}>

    Посмотреть какая схема хранения данных получилась

    In [19]:
    schema = td.merge_tree()
    print(schema)
    
    CREATE TABLE IF NOT EXISTS `deals` (
      `id`  Int64,
      `uid`  String,
      `cid`  String,
      `sale`  Int64,
      `date`  Date,
      `date_time`  DateTime,
      `account_id`  Int64
    ) ENGINE MergeTree() PARTITION BY toYYYYMM(`date`) ORDER BY (`account_id`, `date`) SETTINGS index_granularity=8192
    
    

    То что нужно!

    Тут скрыт, важный и интересный кусочек руководства. Чтобы открыть потребуется выполнить задание! Но сейчас не выйдет :(
    У нас трудности с API Facebook, а оно необходимо для отслеживания выполнения задания :(

    Вариант 2. Классический способ создания таблицы и записи данных

    In [24]:
    ch.run("""
    CREATE TABLE IF NOT EXISTS deals (
      date Date,
      date_time DateTime,
      id UInt64,
      account_id UInt64,
      uid String,
      cid String,
      sale Int64
    ) ENGINE = MergeTree() PARTITION BY toYYYYMM(date) ORDER BY (id, date) SETTINGS index_granularity=8192
    """)
    

    Тишина, значит все окей!

    Запишем наши данные в таблицу ClickHouse

    Продобнее о библиотеке https://github.com/madiedinro/simple-clickhouse

    In [64]:
    from simplech import ClickHouse
    ch = ClickHouse()
    
    with ch.table('deals') as c:
        for deal in deals:
            c.push(deal)
    

    Что у нас вышло

    In [65]:
    rh.print_rows([*ch.objects_stream('SELECT * FROM deals LIMIT 10')])
    
    account_id cid date date_time id sale uid
    24126727 943617990.1549300918 2019-02-10 2019-02-10 18:47:43 6082997 0 -
    24126727 - 2019-02-10 2019-02-10 18:47:53 5837903 0 -
    24126727 - 2019-02-10 2019-02-10 18:47:53 5837903 0 -
    24126727 943617990.1549300918 2019-02-10 2019-02-10 18:47:43 6082997 0 -
    24126727 943617990.1549300918 2019-02-10 2019-02-10 18:47:43 6082997 0 -
    24126727 - 2019-02-10 2019-02-10 18:47:53 5837903 0 -
    23668636 69296758.1541614448 2019-04-05 2019-04-05 01:25:51 3600804 59760 6450101900745225911
    23668636 69296758.1541614448 2019-04-05 2019-04-05 01:25:51 3600804 -59760 6450101900745225911
    23668636 69296758.1541721201 2019-04-06 2019-04-06 01:25:51 10302700 62715 6450101900747412685
    23668636 69296758.1544900888 2019-04-06 2019-04-06 01:25:51 4945072 31041 6450101900747304139
    23668636 69296758.1545375731 2019-04-06 2019-04-06 01:25:51 6420181 68441 6450101900748221436
    23668636 69296758.1544900888 2019-04-06 2019-04-06 01:25:51 4945072 -31041 6450101900747304139
    23668636 69296758.1541721201 2019-04-06 2019-04-06 01:25:51 10302700 -62715 6450101900747412685
    23668636 69296758.1545375731 2019-04-06 2019-04-06 01:25:51 6420181 -68441 6450101900748221436
    23668636 69296758.1547682674 2019-04-07 2019-04-07 01:25:51 2241093 45164 6450101900743640007
    23668636 69296758.1545507119 2019-04-07 2019-04-07 01:25:51 4155669 60772 6450101900741383093
    23668636 69296758.1541460549 2019-04-07 2019-04-07 01:25:51 1974986 30230 6450101900746127262
    23668636 69296758.1545507119 2019-04-07 2019-04-07 01:25:51 4155669 -60772 6450101900741383093
    23668636 69296758.1541460549 2019-04-07 2019-04-07 01:25:51 1974986 -30230 6450101900746127262
    23668636 69296758.1547682674 2019-04-07 2019-04-07 01:25:51 2241093 -45164 6450101900743640007
    23668636 69296758.1542115493 2019-04-08 2019-04-08 01:25:51 2389867 25956 6450101900745710163
    23668636 69296758.1543841903 2019-04-08 2019-04-08 01:25:51 5427677 56769 6450101900746261670
    23668636 69296758.1544788706 2019-04-08 2019-04-08 01:25:51 4739210 38225 6450101900747065508
    23668636 69296758.1542115493 2019-04-08 2019-04-08 01:25:51 2389867 -25956 6450101900745710163
    23668636 69296758.1544788706 2019-04-08 2019-04-08 01:25:51 4739210 -38225 6450101900747065508
    23668636 69296758.1543841903 2019-04-08 2019-04-08 01:25:51 5427677 -56769 6450101900746261670
    23668636 69296758.1542392799 2019-04-09 2019-04-09 01:25:51 2830406 51227 6450101900743843155
    23668636 69296758.1547887321 2019-04-09 2019-04-09 01:25:51 3277930 47730 6450101900747291768
    23668636 69296758.1542442456 2019-04-09 2019-04-09 01:25:51 5293891 44692 6450101900746846249
    23668636 69296758.1542442456 2019-04-09 2019-04-09 01:25:51 5293891 -44692 6450101900746846249
    23668636 69296758.1542392799 2019-04-09 2019-04-09 01:25:51 2830406 -51227 6450101900743843155
    23668636 69296758.1547887321 2019-04-09 2019-04-09 01:25:51 3277930 -47730 6450101900747291768
    23668636 69296758.1547391154 2019-04-11 2019-04-11 01:25:51 6082693 49644 6450101900742384102
    23668636 69296758.1543933355 2019-04-11 2019-04-11 01:25:51 4754368 43436 6450101900747484481
    23668636 69296758.1547391154 2019-04-11 2019-04-11 01:25:51 6082693 -49644 6450101900742384102
    23668636 69296758.1543933355 2019-04-11 2019-04-11 01:25:51 4754368 -43436 6450101900747484481
    In [67]:
    print(ch.select('SHOW TABLES'))
    
    activity
    amocrm_deals
    bot_feedback
    cookiesync
    crossstat
    deals
    dr19_vk
    events
    facebook_wh
    ga
    ga_stat
    ga_visits
    logs
    migrations
    raw_ga
    shortcuts
    tablename
    tablename2
    telegram
    test_activity
    test_events
    test_stat
    test_write
    vk_stat
    webhooks
    wh_zadarma
    wh_ztrack
    ym_test
    ym_visits
    
    

    Если дубликадов не удается избежать. Иструкция DISTINCT как раз этим занимается,но будте аккуратны, не должно попасть ни одна колонка где данные могут отличаться!

    In [68]:
    query = """
    SELECT DISTINCT id, uid, cid, date
    FROM deals
    """
    
    data = [*ch.objects_stream(query)]
    
    rh.print_rows(data)
    
    cid date id uid
    943617990.1549300918 2019-02-10 6082997 -
    - 2019-02-10 5837903 -
    69296758.1541614448 2019-04-05 3600804 6450101900745225911
    69296758.1541721201 2019-04-06 10302700 6450101900747412685
    69296758.1544900888 2019-04-06 4945072 6450101900747304139
    69296758.1545375731 2019-04-06 6420181 6450101900748221436
    69296758.1547682674 2019-04-07 2241093 6450101900743640007
    69296758.1545507119 2019-04-07 4155669 6450101900741383093
    69296758.1541460549 2019-04-07 1974986 6450101900746127262
    69296758.1542115493 2019-04-08 2389867 6450101900745710163
    69296758.1543841903 2019-04-08 5427677 6450101900746261670
    69296758.1544788706 2019-04-08 4739210 6450101900747065508
    69296758.1542392799 2019-04-09 2830406 6450101900743843155
    69296758.1547887321 2019-04-09 3277930 6450101900747291768
    69296758.1542442456 2019-04-09 5293891 6450101900746846249
    69296758.1547391154 2019-04-11 6082693 6450101900742384102
    69296758.1543933355 2019-04-11 4754368 6450101900747484481

    Вариант 3. Управление БД через сервис CHWriter

    Самый простой способ описать ее в конфиге chwriter, он создаст ее и в дальнейшем расширит, при необходимости. Для этого в Theia создайте файл config/custom/chwriter/amocrm.yml с содержимым:

    clickhouse:
      tables:
        amocrm_deals:
          date: Date
          date_time: DateTime
          id: UInt64
          uid: String
          cid: String
          sale: Int64
          account_id: Int64
          _options:
            engine: MergeTree() PARTITION BY toYYYYMM(date) ORDER BY (id, date) SETTINGS index_granularity=8192
    

    В дальнейшем можно будет просто добавить необходимые колонки и они будут автоматически созданы при перезагрузке chwriter.

    In [69]:
    print(ch.select('SHOW TABLES'))
    
    activity
    amocrm_deals
    bot_feedback
    cookiesync
    crossstat
    deals
    dr19_vk
    events
    facebook_wh
    ga
    ga_stat
    ga_visits
    logs
    migrations
    raw_ga
    shortcuts
    tablename
    tablename2
    telegram
    test_activity
    test_events
    test_stat
    test_write
    vk_stat
    webhooks
    wh_zadarma
    wh_ztrack
    ym_test
    ym_visits
    
    

    Самостоятельно

    Это только начало пути. Многое можно сделать лучше, но перед этим надо озаботиться обработкой по датам. Поэтому вот вам задачка, поработать самостоятельно:

    • Продумать как за какие даты получать данные
    • Сделать передачу дат в запрос сделок
    • Мы специально воспользовались самым обычным движком MergeTree, для других еще рановато. Поэтому можете попробовать сделать чтение данных из БД перед тем как писать. Писать только те сделки, которых нет.

    Дашборд в Grafana

    В дополнение содадим простенький дашборд в Grafana, показывающий кол-во завершившихся сделок по дням. Но чтобы было, что показывать давайте сгененрируем немного фейковых данных. Для этого нам потребуется генератор случайных чисел, импортируем его и сгенерируем необходимое кол-во сделок.

    In [31]:
    # Импортируем модуль генерации случайных чисел
    from random import randint
    # Сегодня
    today = arrow.now()
    # Сколько дней назад будем смещаться
    use_days = 7
    # Сколько сделок хоти
    deals_amount = 15
    # Список фэйков сделок
    fake_deals = []
    
    Тут скрыт, важный и интересный кусочек руководства. Чтобы открыть потребуется выполнить задание! Но сейчас не выйдет :(
    У нас трудности с API Facebook, а оно необходимо для отслеживания выполнения задания :(
    In [32]:
    # при помощи среза (slice) берем только несколько записей для вывода, чтобы не засорять горизонт
    pprint(fake_deals[:3])
    
    [
      {
        'account_id': 23668636,
        'cid': '69296758.1542392799',
        'date': '2019-04-09',
        'date_time': '2019-04-09 01:25:51',
        'id': 2830406,
        'sale': 51227,
        'uid': '6450101900743843155'
      },
      {
        'account_id': 23668636,
        'cid': '69296758.1541614448',
        'date': '2019-04-05',
        'date_time': '2019-04-05 01:25:51',
        'id': 3600804,
        'sale': 59760,
        'uid': '6450101900745225911'
      },
      {
        'account_id': 23668636,
        'cid': '69296758.1542115493',
        'date': '2019-04-08',
        'date_time': '2019-04-08 01:25:51',
        'id': 2389867,
        'sale': 25956,
        'uid': '6450101900745710163'
      }
    ]
    
    In [33]:
    # Пишем в кх
    with ch.table('deals') as b:
        for d in fake_deals:
            b.push(d)
    
    In [34]:
    rh.print_rows([*ch.objects_stream('SELECT * FROM deals LIMIT 10')], limit=10)
    
    account_id cid date date_time id sale uid
    24126727 943617990.1549300918 2019-02-10 2019-02-10 18:47:43 6082997 0 -
    24126727 - 2019-02-10 2019-02-10 18:47:53 5837903 0 -
    24126727 - 2019-02-10 2019-02-10 18:47:53 5837903 0 -
    24126727 943617990.1549300918 2019-02-10 2019-02-10 18:47:43 6082997 0 -
    23668636 69296758.1541614448 2019-04-05 2019-04-05 01:25:51 3600804 59760 6450101900745225911
    23668636 69296758.1541721201 2019-04-06 2019-04-06 01:25:51 10302700 62715 6450101900747412685
    23668636 69296758.1544900888 2019-04-06 2019-04-06 01:25:51 4945072 31041 6450101900747304139
    23668636 69296758.1545375731 2019-04-06 2019-04-06 01:25:51 6420181 68441 6450101900748221436
    23668636 69296758.1547682674 2019-04-07 2019-04-07 01:25:51 2241093 45164 6450101900743640007
    23668636 69296758.1545507119 2019-04-07 2019-04-07 01:25:51 4155669 60772 6450101900741383093
    23668636 69296758.1541460549 2019-04-07 2019-04-07 01:25:51 1974986 30230 6450101900746127262

    Теперь графана

    Видео, как создавать панели с графиками в Grafana

    Автоматизация сбора статистики, сервисы

    Jupyter - это замечательный инструмент для написания кусочкой кода и отладки, но нам надо сделать инструмент, собираюзий данные по расписанию, а не блокнот. Поэтому превратим наш код в код сервиса.

    Rockstat, как площадка для экспериментов и запуска микросервисов

    Как зарегистрировать сервер и развернуть на нем Rockstat в части Zero

    aiohttp - асинхронный http сервер и клиентв

    https://aiohttp.readthedocs.io/en/stable/client_quickstart.html

    Асинхронное программирование

    Пара статей и курс на эту тему. Асинхронность является специфической областью занинй для большинства python программиств

    Через некоторое время запросы перестанут работать и будут выдавать

    {'response': {'error': 'Неверный логин или пароль', 'error_code': '110', 'ip': '88.212.240.252', 'domain': 'wevdiw.amocrm.ru', 'server_time': 1547687325}}
    

    Почему-то серверы АМО забывают сессию, и нужно опять опять авторизоваться.

    Следует доработать функцию выполнения запросов, чтобы она обрабатывала ошибки авторизации и производила повторный запрос. Этот же алгоритм обработает ситуацию, когда нужно авторизоваться после перезапуска (потеряны значения переменных)

    Далее функция, которая собрана из всего, что было ранее

    In [50]:
    def amo_query(path, params=None, data=None, json=None, method='get', attemps=2, auth_request=False):
        # По-умолчанию высталено 2 прохождения цикла.
        # На первом может произойти ошибка авторизации,
        # за ней последует процесс авторизации.
        # На втором проходе запрос будет выполнен с корретными привелегиями
        for i in range(attemps):
            # Стоим урл запроса и записываем в переменную
            url = amo_domain + path
            # базовые параметры, которые дописываются ко всем запросам
            base_params = {'type':'json'}
            # Выполнение запроса
            res = requests.request(method, url, data=data, json=json, cookies=state['cookies'], params={**base_params, **(params or {})})
            # если сервер вернул ответ что все ок
            if res.status_code == 200:
                # Сохраняем куки после успешной авторизации
                if auth_request:
                    state['cookies'] = res.cookies
                return res.json()
            # В случае ошибки авторизации
            elif res.status_code == 401:
                # Если это не авторизационный запрос
                if auth_request == False:
                    req_data = { 'USER_LOGIN': amo_user,  'USER_HASH': amo_key }
                    # Вызываем запрос авторизации
                    auth_result = amo_query('/private/api/auth.php', data=req_data, auth_request=True, method='post')
            print(res.status_code, res.json())
            
            
    amo_query('/api/v2/account')
    
    401 {'response': {'error': 'Неверный логин или пароль', 'error_code': '110', 'ip': '88.212.240.252', 'domain': 'randomint.amocrm.ru', 'server_time': 1554946093}}
    
    Out[50]:
    {'_links': {'self': {'href': '/api/v2/account?type=json', 'method': 'get'}},
     'id': 24126727,
     'name': 'randomint',
     'subdomain': 'randomint',
     'currency': 'RUB',
     'timezone': 'Europe/Moscow',
     'timezone_offset': '+03:00',
     'language': 'ru',
     'date_pattern': {'date': 'd.m.Y',
      'time': 'H:i',
      'date_time': 'd.m.Y H:i',
      'time_full': 'H:i:s'},
     'current_user': 3094597}

    Все работает. А теперь заменим библиотеку для выполнения запросов на aiohttp, она требуется для асинхронной работы (об этом потом). Метод помечен async. Для вызовы анинхронных методов используется слово await. В requests и aiohttp немного отличается работа с cookies, остальные параметры совпадают.

    Немного асинхронного программирования

    Возможно для кого-то сразу станте понятно, когда я скажу, что у asyncio точно такой же смысл, как и у node.js. В питоне есть несколько реализаций асинхронности, но самая быстрая (возможно и популярная в продакшн) это uvloop, которая кстати построена на той же самай библиотеке, что и node.js libuv Попробую показать наглядно в чем отличие.

    In [38]:
    import asyncio
    # Примочка, чтобы asyncio корректно работал jupyter
    import nest_asyncio
    nest_asyncio.apply()
    
    In [39]:
    async def tick01():
        for i in range(5):
            await asyncio.sleep(0.1)
            print(0.1)
        return {'a': 111}
    
    async def tick02():
        for i in range(5):
            await asyncio.sleep(0.2)
            print(0.2)
        return {'b': 222}
    
    In [40]:
    # Loop - корневой внутренний цикл, выполняющий поочередно все функции
    loop = asyncio.get_event_loop()
    # Все, что там выполняется это задачи
    task = asyncio.gather(tick01(), tick02())
    # Запускаем его до тех пор, пока задачи на закончатся
    loop.run_until_complete(task)
    
    0.1
    0.2
    0.1
    0.1
    0.2
    0.1
    0.1
    0.2
    0.2
    0.2
    
    Out[40]:
    [{'a': 111}, {'b': 222}]

    Если присмотреться, то можно заметить что функции выполнялись параллельно, и потом вместе вернули результат. Но самое главное, это не многопоточность и нет проблема возникающих при использовании потоков. Асинхронность это когда в одном потоке функции выполнябтся частями. Переключение происзодит в момент ожидания await. А учитывая что большую часть времени программы стоят в ожидании это открыло огромные возможности. Например Node.js это целиком асинхнорнный код, там крайне проблематично писать что-то синхронное, при этом один процесс может обработкать несколько тысяч запросов в секунду.

    Отличия в записи и выполнении функций

    обычная функция выглядит так

    def myfunc(a):
        print(a)
    

    и выполнять ее вот так

    myfunc(1)
    

    асинхронная функция

    async def myfunc(a):
        print(a)
    

    и выполнять ее вот так

    await myfunc(1)
    

    В случае использования операторов контекста и/или генараторов вместо await используеться async with / async for. Не забивайте голову просто дописывайте и ориентируйтесь по примерам, сейчас не время в этом разбираться

    Отличия в работе с http

    В асинхронной стороне питоне не используюьт requests, а используют aiohttp. Аргументы практически идентичны, только появляется дополнительный слой сессии. Рассмотрим пример

    res = requests.get(url, data=data, json=json, cookies=state['cookies'])
    res.json()
    

    Эквивалентом этого запроса будет

    async with aiohttp.ClientSession(cookie_jar=state['cookies']) as sess:
        async with sess.get(url, data=data, json=json) as res:
            res = await res.json()
    

    Да, запись получается немного шире, но

    In [51]:
    state = {'cookies': None}
    
    In [52]:
    import aiohttp
    
    async def amo_query(path, params=None, data=None, json=None, method='get', attemps=2, auth_request=False):
        # По-умолчанию высталено 2 прохождения цикла.
        # На первом может произойти ошибка авторизации,
        # за ней последует процесс авторизации.
        # На втором проходе запрос будет выполнен с корретными привелегиями
        for i in range(attemps):
            # Стоим урл запроса и записываем в переменную
            url = amo_domain + path
            # базовые параметры, которые дописываются ко всем запросам
            base_params = {'type':'json'}
            # Выполнение запроса
            async with aiohttp.ClientSession(cookie_jar=state['cookies']) as session:
                async with session.request(method, url, data=data, json=json, params={**base_params, **(params or {})}) as res:
                    #res = requests.request()
                    # если сервер вернул ответ что все ок
                    if res.status == 200:
                        # Сохраняем куки после успешной авторизации
                        if auth_request:
                            state['cookies'] = session.cookie_jar
                        return await res.json()
                    # В случае ошибки авторизации
                    elif res.status == 401:
                        # Если это не авторизационный запрос
                        if auth_request == False:
                            req_data = { 'USER_LOGIN': amo_user,  'USER_HASH': amo_key }
                            # Вызываем запрос авторизации
                            auth_result = await amo_query('/private/api/auth.php', data=req_data, auth_request=True, method='post')
                    print(res.status, await res.json())
    
    In [ ]:
    # Протестируем функцию
    await amo_query('/api/v2/account', params={'status':success_num})
    
    401 {'response': {'error': 'Неверный логин или пароль', 'error_code': '110', 'ip': '88.212.240.252', 'domain': 'randomint.amocrm.ru', 'server_time': 1554946105}}
    
    Out[ ]:
    {'_links': {'self': {'href': '/api/v2/account?type=json&status=142',
       'method': 'get'}},
     'id': 24126727,
     'name': 'randomint',
     'subdomain': 'randomint',
     'currency': 'RUB',
     'timezone': 'Europe/Moscow',
     'timezone_offset': '+03:00',
     'language': 'ru',
     'date_pattern': {'date': 'd.m.Y',
      'time': 'H:i',
      'date_time': 'd.m.Y H:i',
      'time_full': 'H:i:s'},
     'current_user': 3094597}

    Преренесем код в сервис Rockstat, чтобы он выполнялся по расписанию. Но предварительно подготовим здесь небходимые функции.

    Конфигурация таблицы

    Чтобы использовать систему презаписи надо сделать фиксированную конфигурацию, не зависящую от пришедших данных. Воспользуемся еще одним крайне полезным методом SimpleCH

    In [54]:
    code = td.pycode()
    print(code)
    
    td_deals = ch.discover('deals', columns={
            'id': 'Int64', 
            'uid': 'String', 
            'cid': 'String', 
            'sale': 'Int64', 
            'date': 'Date', 
            'date_time': 'DateTime', 
            'account_id': 'Int64'})\
        .metrics('sale')\
        .dimensions('id', 'cid', 'uid', 'date_time', 'account_id', 'date')\
        .date('date')\
        .idx('account_id', 'date')
    
    

    Отлично, все работает! Теперь сделаем полноценный обработчик записи данных (точнее дозаписи - библиотека сравнивает что есть в БД с тем, что передается на обработку)

    In [55]:
    async def write_deals(deals):
        
        td = ch.discover('deals', columns={'id': 'Int64', 'uid': 'Int64', 'cid': 'String', 'sale': 'Int64', 'date': 'Date', 'date_time': 'DateTime', 'account_id': 'Int64'}).metrics(*['sale']).dimensions(*['date_time', 'account_id', 'cid', 'uid', 'id', 'date']).date(*['date']).idx(*['account_id', 'date'])
        today = arrow.now().format('YYYY-MM-DD')
    
        async with td.difference('2019-01-01', today, deals) as d:
            async for row in d:
                td.push(row)
    

    Соберем в одном месте конфигурацию и необходимые функции.

    In [59]:
    import arrow
    from simplech import AsyncClickHouse
    from datetime import datetime
    
    # --- Параметря подключения к Amo
    amo_domain = 'https://randomint.amocrm.ru'
    # логин AmoCRM
    amo_user = 'randomint@armyspy.com'
    # ключ API
    amo_key = 'ea961728df7452688bcd8fe3cf8e3cb9e8c39e11'
    
    success_num = 142
    state = {'cookies': None}
    
    # Обатите внимание, вместо ClickHouse мы используем асинхронная AsyncClickHouse (неблокирующая версия) модуля
    ch = AsyncClickHouse()
    
    # Структура таблицы
    td_deals = ch.discover('deals', columns={
        'id': 'Int64', 
        'uid': 'Int64', 
        'cid': 'String', 
        'sale': 'Int64', 
        'date': 'Date', 
        'date_time': 'DateTime', 
        'account_id': 'Int64'
    }).metrics(
        'sale'
    ).date('date').idx('account_id', 'date')
    
    
    def format_deal(deal):
        fields = {f['name'].lower(): f['values'][0].get('value') for f in deal['custom_fields']}
        date = datetime.fromtimestamp(deal['closed_at'],)
        return {
            'id': deal['id'],
            'uid': fields.get('uid') or '',
            'cid': fields.get('cid') or '',
            'sale': deal['sale'],
            'date': date.strftime('%Y-%m-%d'),
            'date_time': date.strftime('%Y-%m-%d %H:%M:%S'),
            'account_id': deal['account_id']
        }
    
    
    async def write_deals(deals, d1, d2):
        # Создание таблицы
        await td_deals.merge_tree(execute=True)
        # Подсчет дельты записей
        async with td_deals.difference(d1, d2, deals) as d:
            async for row in d:
                td_deals.push(row)
    
    # Создадим функцию, которая будет производить весь цикл сбора данных, вызывая другие ф-ции
    async def update_data():
        d1 = '2019-01-01'
        d2 = arrow.now().format('YYYY-MM-DD')
        # TODO: Не забудьте сделать обработку дат при получении из апи и записи в БД!
        deals_response = await amo_query('/api/v2/leads', params={'status': success_num})
        deals = [format_deal(d) for d in deals_response['_embedded']['items']]
        await write_deals(deals, d1, d2)
    

    Не забудьте самостоятельно сделать обработку дат

    In [ ]:
    # Произведем полный получения и сохранения данных
    await update_data()
    
    401 {'response': {'error': 'Неверный логин или пароль', 'error_code': '110', 'ip': '88.212.240.252', 'domain': 'randomint.amocrm.ru', 'server_time': 1554946355}}
    
    In [ ]:
    rh.print_rows([rec async for rec in ch.objects_stream('SELECT * FROM deals')], limit=10)
    
    account_id cid date date_time id sale uid
    23668636 69296758.1541614448 2019-04-05 2019-04-05 01:25:51 3600804 59760 6450101900745225911
    23668636 69296758.1541721201 2019-04-06 2019-04-06 01:25:51 10302700 62715 6450101900747412685
    23668636 69296758.1544900888 2019-04-06 2019-04-06 01:25:51 4945072 31041 6450101900747304139
    23668636 69296758.1545375731 2019-04-06 2019-04-06 01:25:51 6420181 68441 6450101900748221436
    23668636 69296758.1547682674 2019-04-07 2019-04-07 01:25:51 2241093 45164 6450101900743640007
    23668636 69296758.1545507119 2019-04-07 2019-04-07 01:25:51 4155669 60772 6450101900741383093
    23668636 69296758.1541460549 2019-04-07 2019-04-07 01:25:51 1974986 30230 6450101900746127262
    23668636 69296758.1542115493 2019-04-08 2019-04-08 01:25:51 2389867 25956 6450101900745710163
    23668636 69296758.1543841903 2019-04-08 2019-04-08 01:25:51 5427677 56769 6450101900746261670
    23668636 69296758.1544788706 2019-04-08 2019-04-08 01:25:51 4739210 38225 6450101900747065508
    23668636 69296758.1542392799 2019-04-09 2019-04-09 01:25:51 2830406 51227 6450101900743843155
    In [ ]:
    rh.print_rows([rec async for rec in ch.objects_stream('SELECT * FROM deals')], limit=10)
    
    account_id cid date date_time id sale uid
    23668636 69296758.1541614448 2019-04-05 2019-04-05 01:25:51 3600804 59760 6450101900745225911
    23668636 69296758.1541721201 2019-04-06 2019-04-06 01:25:51 10302700 62715 6450101900747412685
    23668636 69296758.1544900888 2019-04-06 2019-04-06 01:25:51 4945072 31041 6450101900747304139
    23668636 69296758.1545375731 2019-04-06 2019-04-06 01:25:51 6420181 68441 6450101900748221436
    23668636 69296758.1547682674 2019-04-07 2019-04-07 01:25:51 2241093 45164 6450101900743640007
    23668636 69296758.1545507119 2019-04-07 2019-04-07 01:25:51 4155669 60772 6450101900741383093
    23668636 69296758.1541460549 2019-04-07 2019-04-07 01:25:51 1974986 30230 6450101900746127262
    23668636 69296758.1542115493 2019-04-08 2019-04-08 01:25:51 2389867 25956 6450101900745710163
    23668636 69296758.1543841903 2019-04-08 2019-04-08 01:25:51 5427677 56769 6450101900746261670
    23668636 69296758.1544788706 2019-04-08 2019-04-08 01:25:51 4739210 38225 6450101900747065508
    23668636 69296758.1542392799 2019-04-09 2019-04-09 01:25:51 2830406 51227 6450101900743843155

    Все записалось, а также не записываются дубликаты при повторной обработке. Переносим все это в сервис

    Создаем сервис в Theia

    Ну все, сервис запущен, радуемся постоянно поступающему потоку данных.

    На этом все. Ждите новых серий :)


    In [ ]:
     
    
    In [ ]:
     
    
    Оповещения
    Анонсы материалов руководств в канале @kissmystat. Новости и оповещения в канале @digitalgods, дополнительная
    Вопросы и общение
    Обсудить задачи из руководств можно в TG @kissmystats. Отдельный чат для мероприятий @digitalgodhub
    Сервисный бот
    Схема проезда, feedback, запросы доступа, регистрация на  мероприятия у бота Алены в TG @digitalgodbot