转载 如何发起一场透明公开且科学合理的抽奖
原文:https://www.nodeseek.com/post-92-1
作者:酒神
前段时间办了一场论坛内的小型抽奖活动,让我再次想到了一直琢磨的一个问题:怎样的抽奖才是完善的抽奖呢,能够保证举办方和参与者都认同,降低暗箱操作的可能性呢。
先上结论,个人认为一个好的抽奖活动应该具备以下几点核心要素:
- 结果不可提前预知,也就是主办方和用户都无从知晓
- 结果可以复现,公布的抽奖结果必须能够按照既定算法复现
- 教育成本和操作流程不能太过复杂,最好能够自动化或者通过点击实现
几种典型的反面教材:
- 到了开奖时间或者开奖前,提前用某个骰子生成器抽几个出来,违背了结果不可复现
- 用压缩包加密压缩提前生成的楼层,先公布压缩包,开奖后公布密码,违背了结果不可提前预知(主办方知道中奖楼层)
- 用各种加密与秘钥交换算法生成一套算法出来,网站方与发帖人同时持有必须的秘钥之一,违背了成本/流程原则
其实在举办此活动之前,我一直想在论坛上直接加入抽奖功能,流程上大概是主题帖可以选择性附加一个抽奖活动,设定奖池大小/奖品数目/开奖时间等,然后论坛帮忙完成后续的自动化流程即可。但是思来想去总是没有好的方案出来,期间也搜索了一些现有的方案逛了逛知乎和v2ex寻找灵感,终究是不能得偿所愿。最后能想到的办法要么教育用户其原理所需的成本太高(原理复杂讲不清),要么流程过于复杂。
逛LES论坛发现了一个很不错的抽奖方法,稍加改造就用到了本次活动中,讲一下原理背景及操作流程:
熵联盟
由Cloudflare、洛桑联邦理工学院(EPFL)、智利大学、Kudelski Security等众多权威机构主导的一个去中心化的随机信标(random beacon),根据一系列复杂的技术原理保证随机的可靠性,对原理感兴趣的可以点击这个扩展阅读。这个随机信标每30s生成一个随机字符串,大概长这样:
afa0dd948208c2f1d0ee62c3f121d60d6e702576544815118f1639d165cfc593
你可以访问https://api.drand.sh/来获取任意周期(每30s一个周期)的随机信标结果,具体的开发者手册见这里。我们知道在随机取样过程中,虽然取样算法各有不同,但是只要保证所用伪随机数的种子一样,那么取样结果就是固定的。
诶嘿,你发现了没有,这个随机信标就很适合当随机种子呀。剩下的事情就简单了,我们只需要获取开奖时刻所在周期的随机信标作为随机种子,然后用一个公开的抽样算法来取样就可以了。Cloudflare Beacon还有个优点是可以查询任何历史周期的随机信标结果,保证了你抽奖之后任何时间回头检查都可以知道有没有作弊。
随机取样过程
取样算法的设计就比较多了,一般都是自己固定一套流程出来,保证可以复现就行了。为了偷懒我用了random.org的随机取样器,他也是支持用户输入随机种子的:
这里注意,整个取样过程实际上是个全排列过程,比方说到截止时间共有233个楼层,则生成1-233楼层的一个全排列,主办方根据自己的规定依次校核结果序列中每个楼层是否符合要求,比如去掉重复发帖和格式不符等等,这样就可以得到最终的中奖人员。
简单的自动化js脚本
讲了这么多其实就是一句话,用CF Beacon做随机种子,然后对楼层做全排列依次校核中奖人员即可。
那么有没有简化流程的全自动js脚本呢,应该是比较好写的,这里举个例子,读者可以根据自己的要求改动:
let openTime = 'Fri Dec 02 2022 12:00:00 GMT+0800 (China Standard Time)' // 开奖时间
let maxFloor = 233 // 最大楼层
let beaconRound = (new Date(openTime).getTime() / 1000 - 1595431050) / 30
const MAINCHAIN = '8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce'
let beaconUrl = `https://api.drand.sh/${MAINCHAIN}/public/${beaconRound}`
fetch(beaconUrl)
.then(r => r.json())
.then(r => {
let seed = r.randomness
let randomOrgUrl = `https://www.random.org/sequences/?min=1&max=${maxFloor}&col=1&format=plain&rnd=id.${seed}`
return fetch(randomOrgUrl)
.then(r => r.text())
.then(r => {
console.log(`中奖楼层为${r.replace(/\n/g, ', ')}`)
})
})
按照这个原理写了个抽奖程序,可以保证在主题帖确定,抽奖结束时间确定,正确答案确定的情况下,中奖者一定不变。而抽奖结束时间,也就是投票结束时间是在投票发贴的时候就必须确定的,过了发帖后的5分钟,管理员也无法修改。因为生成的是有资格参与抽奖者的全排列,所以我们按照帖子内描述奖品的顺序发放。
import requests
import csv
from datetime import datetime
right_answer = ("超腾讯视频10H")
target_post = 26886 # 主题帖ID
headers = {
"Content-Type": "application/json"
}
def get_topic_data():
topic_url = f"https://bb.zlb.ink/t/topic/{target_post}.json"
try:
response = requests.get(topic_url, timeout=10, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"获取主题数据失败: {e}")
def find_poll_data(topic_data):
posts = topic_data.get("post_stream", {}).get("posts", [])
original_post = next((p for p in posts if p.get("post_number") == 1), None)
if not original_post:
raise Exception("未找到原帖")
post_id = original_post.get("id")
if not post_id:
raise Exception("原帖缺失ID")
polls = original_post.get("polls")
if not polls or not isinstance(polls, list):
raise Exception("原帖未找到投票数据")
# 返回关闭时间、post_id 和 poll
return post_id, polls, original_post.get("closed")
def find_correct_option(polls):
close_time_str = None
correct_option_id = None
poll_name =None
# 查找正确答案选项(包含HTML标签和完整内容)
for poll in polls:
# 获取关闭时间(使用第一个找到的poll的关闭时间)
if not poll_name:
poll_name = poll.get("name")
if not close_time_str:
close_time_str = poll.get("close")
# 查找正确选项
for option in poll.get("options", []):
if right_answer in option.get("html", ""):
correct_option_id = option.get("id")
if not close_time_str:
raise Exception("未找到投票关闭时间")
if not correct_option_id:
raise Exception(f"未找到匹配'{right_answer}'的正确选项")
return poll_name, close_time_str, correct_option_id
def export_voters(post_id, poll_name, option_id):
params = {
"post_id": post_id,
"poll_name": poll_name,
"option_id": option_id,
"page": 1,
"limit": 50
}
base_url = "https://bb.zlb.ink/polls/voters.json"
seen_voter_ids = set()
collected_voters = [] # 用于收集所有投票者
total_voters_processed = 0
unique_voters_count = 0
print("开始获取投票数据...")
while True:
print(f"正在请求第 {params['page']} 页数据...")
try:
response = requests.get(base_url, headers=headers, params=params, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"请求失败:{e}")
break
try:
data = response.json()
except ValueError:
print(f"无法解析 JSON 响应: {response.text[:200]}...")
break
if not data:
print("响应数据为空")
break
voters_data = data.get("voters")
if voters_data is None:
print("'voters' 数据为 null,导出完成")
break
voter_list = voters_data.get(str(option_id), [])
if not voter_list:
print(f"第 {params['page']} 页无数据,导出完成")
break
print(f"第 {params['page']} 页获取到 {len(voter_list)} 条记录")
for voter in voter_list:
total_voters_processed += 1
voter_id = voter.get('id')
if voter_id and voter_id not in seen_voter_ids:
seen_voter_ids.add(voter_id)
collected_voters.append(voter) # 收集用户数据
unique_voters_count += 1
# 检查是否有下一页
if len(voter_list) < params['limit']:
print("已到达最后一页")
break
params["page"] += 1
# 按ID排序
collected_voters = sorted(collected_voters, key=lambda x: int(x.get('id', 0)))
print(f"处理了 {total_voters_processed} 条记录")
print(f"找到 {unique_voters_count} 个唯一用户")
return collected_voters, unique_voters_count
def lottery_draw(close_time_str, max_floor):
print(f"\n开始抽奖流程...")
print(f"关闭时间: {close_time_str}")
print(f"参与抽奖用户数: {max_floor}")
dt = datetime.fromisoformat(close_time_str.replace('Z', '+00:00'))
# 计算beaconRound
origin_timestamp = 1595431050
beacon_round = int((dt.timestamp() - origin_timestamp) // 30)
print(f"计算的 Beacon 轮次: {beacon_round}")
# 固定主链值
MAINCHAIN = '8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce'
beacon_url = f'https://api.drand.sh/{MAINCHAIN}/public/{beacon_round}'
# 获取随机种子
try:
beacon_response = requests.get(beacon_url, timeout=10).json()
seed = beacon_response.get('randomness')
if not seed:
raise Exception(f"API响应中未找到随机种子: {beacon_response}")
print(f"获取到随机种子: {seed[:20]}...")
except Exception as e:
raise Exception(f"获取随机种子失败: {e}")
# 生成随机数序列
random_org_url = f'https://www.random.org/sequences/?min=1&max={max_floor}&col=1&format=plain&rnd=id.{seed}'
try:
response = requests.get(random_org_url, timeout=10)
response.raise_for_status()
random_sequence = response.text.strip().split("\n")
print(f"成功获取随机数序列")
except Exception as e:
raise Exception(f"获取随机数序列失败: {e}")
return [int(num) for num in random_sequence]
def display_winners(winners, voters_list, max_floor):
print("\n===== 中奖用户信息 =====")
print(f"总抽奖号码范围: 1-{max_floor}")
print(f"中奖号码: {', '.join(map(str, winners))}")
print("\n中奖用户详细名单:")
for winner_floor in winners:
if 1 <= winner_floor <= len(voters_list):
voter = voters_list[winner_floor - 1]
print(f"第{winner_floor}位 - ID: {voter['id']}, 用户名: {voter['username']}")
else:
print(f"无效楼层号: {winner_floor} (超出范围)")
# ====== 主执行流程 ======
try:
print("=== 开始执行任务 ===")
# 1. 获取主题数据
topic_data = get_topic_data()
print("√ 成功获取主题数据")
# 2. 获取帖子和投票数据
post_id, polls, close_flag = find_poll_data(topic_data)
if close_flag:
print(f"√ 投票已关闭,允许抽奖 (post_id: {post_id})")
# 3. 查找关闭时间和正确选项
poll_name, close_time_str, option_id = find_correct_option(polls)
print(f"√ 找到正确选项: {option_id}")
# 4. 导出投票用户
voters_list, max_floor = export_voters(post_id, poll_name, option_id)
# 5. 执行抽奖
winners = lottery_draw(close_time_str, max_floor)
# 6. 显示中奖结果
display_winners(winners, voters_list, max_floor)
print("\n=== 任务执行完成 ===")
except Exception as e:
print(f"\n! 执行失败: {e}")