[streamlit] 튜토리얼 > 스트리밍 fragment 시작 및 중지

[출처] https://docs.streamlit.io/develop/tutorials/execution-flow/start-and-stop-fragment-auto-reruns

 

Streamlit Docs

Join the community Streamlit is more than just a way to make data apps, it's also a community of creators that share their apps and ideas and help each other make their work better. Please come join us on the community forum. We love to hear your questions

docs.streamlit.io


1. Intro

Streamlit을 사용하면 함수를 전체 스크립트와 독립적으로 재실행할 수 있는 fragment로 전환할 수 있습니다. 또한 설정된 시간 간격으로 fragment를 재실행하도록 Streamlit에 지시할 수도 있습니다. 이 기능은 데이터 스트리밍이나 프로세스 모니터링에 유용합니다. 사용자가 이 라이브 스트리밍을 시작하고 중지하도록 할 수 있습니다. 이렇게 하려면 프로그래밍 방식으로 fragment의 run_every 파라미터를 설정하세요.

Applied concepts

  • fragment를 사용하여 라이브 데이터를 스트리밍합니다.
  • fragment가 자동으로 다시 실행되지 않도록 시작 및 중지합니다.

Prerequisites

  • 파이썬 환경에는 다음이 설치되어 있어야 합니다: streamlit>=1.37.0
  • your-repository 라는 깨끗한 작업 디렉터리가 있어야 합니다.
  • fragment에 대한 기본적인 이해가 있어야 합니다.

Summary

이 예제에서는 두 개의 데이터 시리즈를 라인 차트로 스트리밍하는 앱을 빌드합니다. 앱은 세션이 처음 로드될 때 최근 데이터를 수집하여 라인 차트를 정적으로 표시합니다. 사이드바에 있는 두 개의 버튼을 통해 사용자는 데이터 스트리밍을 시작하고 중지하여 차트를 실시간으로 업데이트할 수 있습니다. fragment를 사용하여 실시간 업데이트의 빈도와 범위를 관리할 수 있습니다.

구축할 내용을 살펴보세요:

import streamlit as st
import pandas as pd
import numpy as np
from datetime import datetime, timedelta


def get_recent_data(last_timestamp):
    """Generate and return data from last timestamp to now, at most 60 seconds."""
    now = datetime.now()
    if now - last_timestamp > timedelta(seconds=60):
        last_timestamp = now - timedelta(seconds=60)
    sample_time = timedelta(seconds=0.5)  # time between data points
    next_timestamp = last_timestamp + sample_time
    timestamps = np.arange(next_timestamp, now, sample_time)
    sample_values = np.random.randn(len(timestamps), 2)

    data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"])
    return data


if "data" not in st.session_state:
    st.session_state.data = get_recent_data(datetime.now() - timedelta(seconds=60))

if "stream" not in st.session_state:
    st.session_state.stream = False


def toggle_streaming():
    st.session_state.stream = not st.session_state.stream


st.title("Data feed")
st.sidebar.slider(
    "Check for updates every: (seconds)", 0.5, 5.0, value=1.0, key="run_every"
)
st.sidebar.button(
    "Start streaming", disabled=st.session_state.stream, on_click=toggle_streaming
)
st.sidebar.button(
    "Stop streaming", disabled=not st.session_state.stream, on_click=toggle_streaming
)

if st.session_state.stream is True:
    run_every = st.session_state.run_every
else:
    run_every = None


@st.fragment(run_every=run_every)
def show_latest_data():
    last_timestamp = st.session_state.data.index[-1]
    st.session_state.data = pd.concat(
        [st.session_state.data, get_recent_data(last_timestamp)]
    )
    st.session_state.data = st.session_state.data[-100:]
    st.line_chart(st.session_state.data)


show_latest_data()


2. Build the example

2.1. Initialize your app

원문 페이지 참고

2.2. Build a function to generate random, recent data

먼저 "A"와 "B"라고 부르는 두 시계열에 대해 일부 데이터를 무작위로 생성하는 함수를 정의합니다. 함수를 복사만 하려는 경우 이 섹션을 건너뛰어도 괜찮습니다.

def get_recent_data(last_timestamp):
    """Generate and return data from last timestamp to now, at most 60 seconds."""
    now = datetime.now()
    if now - last_timestamp > timedelta(seconds=60):
        last_timestamp = now - timedelta(seconds=60)
    sample_time = timedelta(seconds=0.5)  # time between data points
    next_timestamp = last_timestamp + sample_time
    timestamps = np.arange(next_timestamp, now, sample_time)
    sample_values = np.random.randn(len(timestamps), 2)

    data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"])
    return data

1. 함수 정의를 시작하세요.

def get_recent_data(last_timestamp):
    """Generate and return data from last timestamp to now, at most 60 seconds."""

가장 최근 데이터 포인트의 타임스탬프를 데이터 생성 함수에 전달합니다. 함수는 이를 사용하여 새 데이터만 반환합니다.

2. 현재 시간을 가져오고 60초 이상 지난 경우 마지막 타임스탬프를 조정합니다.

    now = datetime.now()
    if now - last_timestamp > timedelta(seconds=60):
        last_timestamp = now - timedelta(seconds=60)

마지막 타임스탬프를 업데이트하면 함수가 60초 이상의 데이터를 반환하지 않도록 할 수 있습니다.

3. 새 변수 sample_time을 선언하여 데이터 포인트 사이의 시간을 정의합니다. 첫 번째 새 데이터포인트의 타임스탬프를 계산합니다.

    sample_time = timedelta(seconds=0.5)  # time between data points
    next_timestamp = last_timestamp + sample_time

4. datetime.datetime 인덱스를 만들고 길이가 같은 두 개의 데이터 계열을 생성합니다.

    timestamps = np.arange(next_timestamp, now, sample_time)
    sample_values = np.random.randn(len(timestamps), 2)

5. 데이터 계열을 인덱스와 결합하여 pandas.DataFrame으로 만들고 데이터를 반환합니다.

    data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"])
    return data

6. (선택 사항) 함수를 호출하고 데이터를 표시하여 함수를 테스트합니다.

data = get_recent_data(datetime.now() - timedelta(seconds=60))
data

app.py 파일을 저장하여 미리 보기를 확인합니다. 완료되면 이 두 줄을 삭제합니다.

2.3. Initialize Session State values for your app

@st.fragment()의 run_every 파라미터를 동적으로 변경하게 되므로 fragment 함수를 정의하기 전에 관련 변수와 세션 상태 값을 초기화해야 합니다. fragment 함수는 세션 상태의 값도 읽고 업데이트하므로 지금 정의하면 fragment 함수를 더 쉽게 이해할 수 있습니다.

1. 세션에서 첫 번째 앱 로드를 위해 데이터를 초기화합니다.

if "data" not in st.session_state:
    st.session_state.data = get_recent_data(datetime.now() - timedelta(seconds=60))

앱은 사용자가 데이터 스트리밍을 시작하기 전에 이 초기 데이터를 정적 라인 차트에 표시합니다.

2. 세션 상태의 '스트림'을 초기화하여 스트리밍을 켜고 끕니다. 기본값은 꺼짐(False)으로 설정합니다.

if "stream" not in st.session_state:
    st.session_state.stream = False

3. 콜백 함수를 만들어 'stream'을 True나 False로 전환합니다.

def toggle_streaming():
    st.session_state.stream = not st.session_state.stream

4. 앱에 제목을 추가합니다.

st.title("Data feed")

5. 사이드바에 슬라이더를 추가하여 스트리밍 중 데이터 확인 빈도를 설정할 수 있습니다.

st.sidebar.slider(
    "Check for updates every: (seconds)", 0.5, 5.0, value=1.0, key="run_every"
)

6. 사이드바에 버튼을 추가하여 스트리밍을 켜고 끌 수 있습니다.

st.sidebar.button(
    "Start streaming", disabled=st.session_state.stream, on_click=toggle_streaming
)
st.sidebar.button(
    "Stop streaming", disabled=not st.session_state.stream, on_click=toggle_streaming
)

두 함수 모두 동일한 콜백을 사용하여 세션 상태의 "stream"을 토글합니다. 버튼 중 하나를 비활성화하려면 현재 값 "stream"을 사용합니다. 이렇게 하면 버튼이 항상 현재 상태와 일치하도록 " Start streaming"은 스트리밍이 꺼져 있을 때만 클릭할 수 있고 " Stop streaming"는 스트리밍이 켜져 있을 때만 클릭할 수 있습니다. 또한 버튼은 사용자가 사용할 수 있는 동작을 강조 표시하여 사용자에게 상태를 제공합니다.

7. 조각 함수의 자동 재실행 여부(및 속도)를 결정할 새 변수 run_every를 생성하고 설정합니다.

if st.session_state.stream is True:
    run_every = st.session_state.run_every
else:
    run_every = None

2.4. Build a fragment function to stream data

사용자가 데이터 스트리밍을 켜고 끌 수 있도록 하려면 @st.fragment() 데코레이터에서 run_every 파라미터를 설정해야 합니다.

@st.fragment(run_every=run_every)
def show_latest_data():
    last_timestamp = st.session_state.data.index[-1]
    st.session_state.data = pd.concat(
        [st.session_state.data, get_recent_data(last_timestamp)]
    )
    st.session_state.data = st.session_state.data[-100:]
    st.line_chart(st.session_state.data)

1. @st.fragment 데코레이터를 사용하여 함수 정의를 시작하세요.

 @st.fragment(run_every=run_every)
 def show_latest_data():

위에서 선언한 run_every 변수를 사용하여 같은 이름의 매개변수를 설정합니다.

2. 세션 상태의 마지막 데이터 포인트의 타임스탬프를 검색합니다.

    last_timestamp = st.session_state.data.index[-1]

3. 세션 상태의 데이터를 업데이트하고 마지막 100개의 타임스탬프만 유지하도록 트리밍합니다.

    st.session_state.data = pd.concat(
        [st.session_state.data, get_recent_data(last_timestamp)]
    )
    st.session_state.data = st.session_state.data[-100:]

4. 데이터를 꺾은선형 차트로 표시합니다.

    st.line_chart(st.session_state.data)

fragment 함수 정의가 완료되었습니다.

2.5. Call and test out your fragment function

1. 코드 하단에서 함수를 호출합니다.

show_latest_data()

2. "스트리밍 시작"을 클릭하여 앱을 테스트합니다. 업데이트 빈도를 조정해 보세요.

2.6. Next steps

데이터 생성 빈도나 세션 상태에 보관되는 데이터의 양을 조정해 보세요. get_recent_data 내에서 위젯으로 sample_time을 설정해 보세요. st.plotly_chart 또는 st.altair_chart를 사용하여 차트에 레이블을 추가해 보세요.