论坛抽奖程序公布

转载 如何发起一场透明公开且科学合理的抽奖

原文:https://www.nodeseek.com/post-92-1
作者:酒神
前段时间办了一场论坛内的小型抽奖活动,让我再次想到了一直琢磨的一个问题:怎样的抽奖才是完善的抽奖呢,能够保证举办方和参与者都认同,降低暗箱操作的可能性呢。

先上结论,个人认为一个好的抽奖活动应该具备以下几点核心要素:

  • 结果不可提前预知,也就是主办方和用户都无从知晓
  • 结果可以复现,公布的抽奖结果必须能够按照既定算法复现
  • 教育成本和操作流程不能太过复杂,最好能够自动化或者通过点击实现

几种典型的反面教材:

  • 到了开奖时间或者开奖前,提前用某个骰子生成器抽几个出来,违背了结果不可复现
  • 用压缩包加密压缩提前生成的楼层,先公布压缩包,开奖后公布密码,违背了结果不可提前预知(主办方知道中奖楼层)
  • 用各种加密与秘钥交换算法生成一套算法出来,网站方与发帖人同时持有必须的秘钥之一,违背了成本/流程原则

其实在举办此活动之前,我一直想在论坛上直接加入抽奖功能,流程上大概是主题帖可以选择性附加一个抽奖活动,设定奖池大小/奖品数目/开奖时间等,然后论坛帮忙完成后续的自动化流程即可。但是思来想去总是没有好的方案出来,期间也搜索了一些现有的方案逛了逛知乎和v2ex寻找灵感,终究是不能得偿所愿。最后能想到的办法要么教育用户其原理所需的成本太高(原理复杂讲不清),要么流程过于复杂。

逛LES论坛发现了一个很不错的抽奖方法,稍加改造就用到了本次活动中,讲一下原理背景及操作流程:

熵联盟

由Cloudflare、洛桑联邦理工学院(EPFL)、智利大学、Kudelski Security等众多权威机构主导的一个去中心化的随机信标(random beacon),根据一系列复杂的技术原理保证随机的可靠性,对原理感兴趣的可以点击这个扩展阅读。这个随机信标每30s生成一个随机字符串,大概长这样:

afa0dd948208c2f1d0ee62c3f121d60d6e702576544815118f1639d165cfc593

你可以访问https://api.drand.sh/来获取任意周期(每30s一个周期)的随机信标结果,具体的开发者手册见这里。我们知道在随机取样过程中,虽然取样算法各有不同,但是只要保证所用伪随机数的种子一样,那么取样结果就是固定的。

image.png

诶嘿,你发现了没有,这个随机信标就很适合当随机种子呀。剩下的事情就简单了,我们只需要获取开奖时刻所在周期的随机信标作为随机种子,然后用一个公开的抽样算法来取样就可以了。Cloudflare Beacon还有个优点是可以查询任何历史周期的随机信标结果,保证了你抽奖之后任何时间回头检查都可以知道有没有作弊。
image.png

随机取样过程

取样算法的设计就比较多了,一般都是自己固定一套流程出来,保证可以复现就行了。为了偷懒我用了random.org的随机取样器,他也是支持用户输入随机种子的:

image.png

这里注意,整个取样过程实际上是个全排列过程,比方说到截止时间共有233个楼层,则生成1-233楼层的一个全排列,主办方根据自己的规定依次校核结果序列中每个楼层是否符合要求,比如去掉重复发帖和格式不符等等,这样就可以得到最终的中奖人员。
image.png

简单的自动化js脚本

讲了这么多其实就是一句话,用CF Beacon做随机种子,然后对楼层做全排列依次校核中奖人员即可。
那么有没有简化流程的全自动js脚本呢,应该是比较好写的,这里举个例子,读者可以根据自己的要求改动:

let openTime = 'Fri Dec 02 2022 12:00:00 GMT+0800 (China Standard Time)' // 开奖时间
let maxFloor = 233 // 最大楼层

let beaconRound = (new Date(openTime).getTime() / 1000 - 1595431050) / 30
const MAINCHAIN = '8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce'
let beaconUrl = `https://api.drand.sh/${MAINCHAIN}/public/${beaconRound}`
fetch(beaconUrl)
    .then(r => r.json())
    .then(r => {
        let seed = r.randomness
        let randomOrgUrl = `https://www.random.org/sequences/?min=1&max=${maxFloor}&col=1&format=plain&rnd=id.${seed}`
        return fetch(randomOrgUrl)
            .then(r => r.text())
            .then(r => {
                console.log(`中奖楼层为${r.replace(/\n/g, ', ')}`)
            })
    })

按照这个原理写了个抽奖程序,可以保证在主题帖确定,抽奖结束时间确定,正确答案确定的情况下,中奖者一定不变。而抽奖结束时间,也就是投票结束时间是在投票发贴的时候就必须确定的,过了发帖后的5分钟,管理员也无法修改。因为生成的是有资格参与抽奖者的全排列,所以我们按照帖子内描述奖品的顺序发放。

import requests
import csv
from datetime import datetime

right_answer = ("超腾讯视频10H")
target_post = 26886  # 主题帖ID
headers = {
    "Content-Type": "application/json"
}


def get_topic_data():
    topic_url = f"https://bb.zlb.ink/t/topic/{target_post}.json"
    try:
        response = requests.get(topic_url, timeout=10, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        raise Exception(f"获取主题数据失败: {e}")


def find_poll_data(topic_data):
    posts = topic_data.get("post_stream", {}).get("posts", [])
    original_post = next((p for p in posts if p.get("post_number") == 1), None)
    if not original_post:
        raise Exception("未找到原帖")

    post_id = original_post.get("id")
    if not post_id:
        raise Exception("原帖缺失ID")

    polls = original_post.get("polls")
    if not polls or not isinstance(polls, list):
        raise Exception("原帖未找到投票数据")

    # 返回关闭时间、post_id 和 poll
    return post_id, polls, original_post.get("closed")


def find_correct_option(polls):
    close_time_str = None
    correct_option_id = None
    poll_name =None
    # 查找正确答案选项(包含HTML标签和完整内容)
    for poll in polls:
        # 获取关闭时间(使用第一个找到的poll的关闭时间)
        if not poll_name:
            poll_name = poll.get("name")
        if not close_time_str:
            close_time_str = poll.get("close")

        # 查找正确选项
        for option in poll.get("options", []):
            if right_answer in option.get("html", ""):
                correct_option_id = option.get("id")

    if not close_time_str:
        raise Exception("未找到投票关闭时间")
    if not correct_option_id:
        raise Exception(f"未找到匹配'{right_answer}'的正确选项")

    return poll_name, close_time_str, correct_option_id


def export_voters(post_id, poll_name, option_id):

    params = {
        "post_id": post_id,
        "poll_name": poll_name,
        "option_id": option_id,
        "page": 1,
        "limit": 50
    }
    base_url = "https://bb.zlb.ink/polls/voters.json"

    seen_voter_ids = set()
    collected_voters = []  # 用于收集所有投票者
    total_voters_processed = 0
    unique_voters_count = 0
    print("开始获取投票数据...")
    while True:
        print(f"正在请求第 {params['page']} 页数据...")
        try:
            response = requests.get(base_url, headers=headers, params=params, timeout=10)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"请求失败:{e}")
            break
        try:
            data = response.json()
        except ValueError:
            print(f"无法解析 JSON 响应: {response.text[:200]}...")
            break
        if not data:
            print("响应数据为空")
            break
        voters_data = data.get("voters")
        if voters_data is None:
            print("'voters' 数据为 null,导出完成")
            break
        voter_list = voters_data.get(str(option_id), [])
        if not voter_list:
            print(f"第 {params['page']} 页无数据,导出完成")
            break
        print(f"第 {params['page']} 页获取到 {len(voter_list)} 条记录")

        for voter in voter_list:
            total_voters_processed += 1
            voter_id = voter.get('id')

            if voter_id and voter_id not in seen_voter_ids:
                seen_voter_ids.add(voter_id)
                collected_voters.append(voter)  # 收集用户数据
                unique_voters_count += 1
        # 检查是否有下一页
        if len(voter_list) < params['limit']:
            print("已到达最后一页")
            break

        params["page"] += 1

    # 按ID排序
    collected_voters = sorted(collected_voters, key=lambda x: int(x.get('id', 0)))

    print(f"处理了 {total_voters_processed} 条记录")
    print(f"找到 {unique_voters_count} 个唯一用户")
    return collected_voters, unique_voters_count


def lottery_draw(close_time_str, max_floor):
    print(f"\n开始抽奖流程...")
    print(f"关闭时间: {close_time_str}")
    print(f"参与抽奖用户数: {max_floor}")

    dt = datetime.fromisoformat(close_time_str.replace('Z', '+00:00'))

    # 计算beaconRound
    origin_timestamp = 1595431050
    beacon_round = int((dt.timestamp() - origin_timestamp) // 30)
    print(f"计算的 Beacon 轮次: {beacon_round}")

    # 固定主链值
    MAINCHAIN = '8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce'
    beacon_url = f'https://api.drand.sh/{MAINCHAIN}/public/{beacon_round}'

    # 获取随机种子
    try:
        beacon_response = requests.get(beacon_url, timeout=10).json()
        seed = beacon_response.get('randomness')
        if not seed:
            raise Exception(f"API响应中未找到随机种子: {beacon_response}")
        print(f"获取到随机种子: {seed[:20]}...")
    except Exception as e:
        raise Exception(f"获取随机种子失败: {e}")

    # 生成随机数序列
    random_org_url = f'https://www.random.org/sequences/?min=1&max={max_floor}&col=1&format=plain&rnd=id.{seed}'

    try:
        response = requests.get(random_org_url, timeout=10)
        response.raise_for_status()
        random_sequence = response.text.strip().split("\n")
        print(f"成功获取随机数序列")
    except Exception as e:
        raise Exception(f"获取随机数序列失败: {e}")

    return [int(num) for num in random_sequence]


def display_winners(winners, voters_list, max_floor):
    print("\n===== 中奖用户信息 =====")
    print(f"总抽奖号码范围: 1-{max_floor}")
    print(f"中奖号码: {', '.join(map(str, winners))}")

    print("\n中奖用户详细名单:")
    for winner_floor in winners:
        if 1 <= winner_floor <= len(voters_list):
            voter = voters_list[winner_floor - 1]
            print(f"第{winner_floor}位 - ID: {voter['id']}, 用户名: {voter['username']}")
        else:
            print(f"无效楼层号: {winner_floor} (超出范围)")


# ====== 主执行流程 ======
try:
    print("=== 开始执行任务 ===")

    # 1. 获取主题数据
    topic_data = get_topic_data()
    print("√ 成功获取主题数据")

    # 2. 获取帖子和投票数据
    post_id, polls, close_flag = find_poll_data(topic_data)
    if close_flag:
        print(f"√ 投票已关闭,允许抽奖 (post_id: {post_id})")

    # 3. 查找关闭时间和正确选项
    poll_name, close_time_str, option_id = find_correct_option(polls)
    print(f"√ 找到正确选项: {option_id}")

    # 4. 导出投票用户
    voters_list, max_floor = export_voters(post_id, poll_name, option_id)

    # 5. 执行抽奖
    winners = lottery_draw(close_time_str, max_floor)

    # 6. 显示中奖结果
    display_winners(winners, voters_list, max_floor)

    print("\n=== 任务执行完成 ===")

except Exception as e:
    print(f"\n! 执行失败: {e}")

7 个赞

点开帖子后的我be like:

1 个赞