NIKE SNKRS 抢购流程分析与自动化抢购实现
[留言不会有我的任何响应,请慎重在留言中包含个人联系方式!]
本来是打算帮朋友写一个小程序,据说在网上找了很多人都实现不了nike的自动化抢购的功能,不过后来发现是拿去搞批量购买牟利有点灰产的意思,所以拒绝了继续开发,本来想着核心都实现了他们要搞自己搞吧,然而可笑的是我拒绝后还被这个“朋友”的所谓的“合伙人”嘲讽了一番,呵呵,已拉黑。
实际上nike的开发者也在打击程序自动化抢购,已尝试给他们反馈了相关问题。
nike出的新品鞋会发布在:https://www.nike.com/cn/launch/?s=upcoming ,虽然售价上千却供不应求,反正我是买不起,而抢购时间段通常在早上9点。
抓包分析后发现确实比较麻烦,因为抢购的时间过短,而且请求较多不易分析,某些请求还有访问频率的限制,一些参数是通过js来的,js被混淆后可读性差,而通过真实浏览器动态解析效率较低,登陆过多还会出现登陆异常,可能是有一些人机验证的规则,出现登陆异常发现只要在旁边乱点几下就可以正常登陆了。
最终通过分析app的请求实现了自动化抢购,相较于web的请求app请求更加直观,且反编译apk后有一定可读性。
首先要实现的是登陆,仅仅通过帐号密码无法登陆,还需要验证cookies中的bm_sz和_abck,尝试了通过请求某些功能来实现set-cookie这两个值但是set的值不能登陆。只能通过web的成功登陆后的set-cookie用于app才可以登陆。
而这个时候发现了tokenRefresh请求,用来保持app上的帐号登陆,当登陆的token失效后通过tokenRefresh来刷新,而这个功能没有任何请求的限制,所以帐号只需要登陆一次就可以通过tokenRefresh保持登陆一直使用。
所以就有了在web通过google headless批量登陆获取bm_sz和_abck值,然后模拟手机登陆请求获取refresh_token,就可以永久的使用这个账号来执行相关操作了。参考代码如下:
import json import random import traceback import requests from selenium import webdriver from selenium.webdriver import ActionChains from selenium.webdriver.support.wait import WebDriverWait class WebLogin_Chrome: def __init__(self, username, password): self.cookies = None self.session = requests.session() self.userInfo = None self.username = username self.password = password def login(self): try: chromeOptions = webdriver.ChromeOptions() # 无界面 # chromeOptions.add_argument('--headless') # 禁用gpu加速 chromeOptions.add_argument('--disable-gpu') # 关闭图片 prefs = {"profile.managed_default_content_settings.images": 2} chromeOptions.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(chrome_options=chromeOptions) w_h = driver.get_window_size() width = w_h["width"] height = w_h["height"] clickWidth1 = (width - 500) / 2 clickWidth2 = width / 2 + 250 driver.get("https://www.nike.com/cn/zh_cn/") WebDriverWait(driver, 15).until(lambda x: x.find_element_by_class_name('login-text')) driver.find_element_by_class_name('login-text').click() # 随机位置点击绕过验证 for i in range(random.randint(2, 5)): ActionChains(driver).move_by_offset(clickWidth1, random.randint(0, height)).click().perform() ActionChains(driver).move_by_offset(clickWidth2, random.randint(0, height)).click().perform() driver.find_element_by_name('verifyMobileNumber').send_keys(self.username) driver.find_element_by_name('password').send_keys(self.password) driver.find_element_by_class_name('nike-unite-submit-button').click() # 随机位置点击绕过验证 for i in range(random.randint(2, 5)): ActionChains(driver).move_by_offset(clickWidth1, random.randint(0, height)).click().perform() ActionChains(driver).move_by_offset(clickWidth2, random.randint(0, height)).click().perform() try: WebDriverWait(driver, 5).until_not( lambda x: x.find_element_by_class_name('exp-join-login').is_displayed()) except: # print("等待超时...") pass if not driver.find_element_by_xpath('//*[@id="nike-unite-mobileLoginForm"]/div[1]').is_displayed(): WebDriverWait(driver, 10).until_not( lambda x: x.find_element_by_class_name('exp-join-login').is_displayed()) self.cookies = driver.get_cookies() driver.get("https://unite.nike.com/session.html") userInfo = driver.execute_script( "return localStorage.getItem('com.nike.commerce.nikedotcom.web.credential');") self.userInfo = json.loads(userInfo) except: traceback.print_exc() finally: driver.close() driver.quit() def getCookies(self): cookies = "" if self.cookies != None: for cookie in self.cookies: cookies += (cookie['name'] + "=" + cookie['value'] + ";") return cookies-----------------
import json import socket import ssl import traceback import uuid from WebLogin import WebLogin_Chrome class LoginInit: def __init__(self, username, password): self.username = "+86" + username self.password = password self.host = "s3.nikecdn.com" self.visitorId = str(uuid.uuid4()) self.clientId = "G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj" self.userAgent = "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15F79" webLogin = WebLogin_Chrome(username, password) webLogin.login() self.cookies = webLogin.getCookies() # 获取app登陆请求 def getLoginRequests(self): payload = '{{"username":"{usr}","password":"{pwd}","client_id":"{clientId}","ux_id":"com.nike.commerce.snkrs.ios","grant_type":"password"}}'.format( usr=self.username, pwd=self.password, clientId=self.clientId) head = '''POST /login?appVersion=454&experienceVersion=375&uxid=com.nike.commerce.snkrs.ios&locale=zh_CN&backendEnvironment=identity&browser=Apple%20Computer%2C%20Inc.&os=undefined&mobile=true&native=true&visit=1&visitor={visitorId} HTTP/1.1 Host: {host} Content-Type: application/json Origin: https://{host} Cookie: {cookies} Content-Length: {length} Connection: close Accept: */* User-Agent: {userAgent} Referer: https://s3.nikecdn.com/unite/mobile.html?mid=66794190406425515927935901233201301138?iOSSDKVersion=2.8.4&clientId=G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj&uxId=com.nike.commerce.snkrs.ios&view=none&locale=zh_CN&backendEnvironment=identity Accept-Language: zh-cn'''.format(visitorId=self.visitorId, host=self.host, userAgent=self.userAgent, length=len(payload), cookies=self.cookies) data = head + "\r\n\r\n" + payload return data def getDataFromResponse(self, data): data = json.loads(data.split("\r\n\r\n")[1]) return data # 发送请求 def sendRequestsToHost(self, data): sock = ssl.wrap_socket(socket.socket()) sock.connect((self.host, 443)) sock.sendall(bytes(data, encoding='utf-8')) # recv_data = sock.recv(10240).decode('utf-8') result = "" while True: try: recv_data = sock.recv(2048) result += recv_data.decode('utf-8') except socket.error as err_msg: print('Error receiving data: %s' % err_msg) if not len(recv_data): break sock.close() return result
实现将帐号app登陆后的refresh_token存入数据库备用
有了永久保持登陆的账号后继续分析抢购的请求,发现抢购最终是post请求到https://api.nike.com/launch/entries/v2,但是其中多个值是由其他请求进行设置或者获取,具体分析见代码
class BuySnkrs: def __init__(self, refreshToken, skuId, launchId, productId, postpayLink): self.refreshToken = refreshToken self.skuId = skuId self.launchId = launchId self.productId = productId self.postpayLink = postpayLink self.host = "s3.nikecdn.com" self.apiHost = "api.nike.com" self.visitorId = str(uuid.uuid4()) self.clientId = "G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj" self.userAgent = "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15F79" self.apiUserAgent = "SNKRS/3.3.3 (iPhone; iOS 11.4; Scale/2.00)" self.xNewRelicID = "VQYGVF5SCBADUVBRBgAGVg==" self.xNikeCallerId = "nike:snkrs:ios:3.3" self.checkoutId = str(uuid.uuid4()) self.shippingId = str(uuid.uuid4()) self.paymentsId = str(uuid.uuid4()) self.times = self.getUtcTime() self.token = self.getTokenRefresh() # 格式化一些购买过程中所需的参数 self.userCommerce = self.getUserCommerce() self.email = self.userCommerce["emails"]["primary"]["email"] # print(self.userCommerce["address"]) # for i in self.userCommerce["address"]: # address = self.userCommerce["address"][i] # break address = self.userCommerce["address"]["shipping"] state = address["province"] city = address["locality"] county = address["zone"] address1 = address["line1"] try: address2 = address["line2"] except: address2 = " " postalCode = address["code"] country = address["country"] self.addressInfo = {"state": state, "city": city, "address1": address1, "postalCode": postalCode, "address2": address2, "county": county, "country": country} name = address["name"]["primary"] lastName = name["given"] firstName = name["family"] self.recipientInfo = {"lastName": lastName, "firstName": firstName} phone = address["phone"]["primary"] self.contactInfo = {"phoneNumber": phone, "email": self.email} # 格式化最终抢购所需的参数 self.launchRecipient = {"lastName": lastName, "firstName": firstName, "email": self.email, "phoneNumber": phone} self.launchAddress = {"state": state, "city": city, "address1": address1, "county": county, "country": "CN"} # 获取服务器时区的时间 def getUtcTime(self): utcTime = time.gmtime() return utcTime # 获取最新发布的产品 def gwtProductFeed(self): response = requests.get( "https://api.nike.com/commerce/productfeed/products/snkrs/threads?country=CN&limit=5&locale=zh_CN&skip=0&withCards=true").json() return response # 获取新的通知信息 def getNotifications(self): times = time.strftime("%Y-%m-%dT%H%%3A%M%%3A%S.000%%2B0000", self.times) url = "https://api.nike.com/plus/v3/notifications/me/stored?since={time}&limit=10&locale=zh-Hans_CN".format( time=times) headers = { "deliveryId": "com.nike.onenikecommerce", "appid": "com.nike.commerce.snkrs.ios", "Authorization": "Bearer " + self.token, "User-Agent": self.apiUserAgent, "X-NewRelic-ID": self.xNewRelicID } response = requests.get(url, headers=headers).json() return response # 获取订单信息 def getOrderHistory(self): url = "https://api.nike.com/commerce/ap/orderhistory?action=getOrderHistoryList&country=CN" headers = { "X-NewRelic-ID": self.xNewRelicID, "User-Agent": self.apiUserAgent, "Authorization": "Bearer " + self.token } response = requests.get(url, headers=headers).text return response # 刷新登陆凭证 def getTokenRefresh(self): url = "https://unite.nikecloud.com/tokenRefresh?backendEnvironment=identity&locale=zh_CN&mobile=true&native=true&uxId=com.nike.commerce.snkrs.ios&sdkVersion=2.8.4&backendEnvironment=identity&platform=ios&browser=uniteSDK" data = {"client_id": self.clientId, "grant_type": "refresh_token", "refresh_token": self.refreshToken} r = requests.post(url, json=data).json() return r["access_token"] # 获取用户基本信息 def getUserCommerce(self): url = "https://api.nike.com/user/commerce" headers = { "X-NewRelic-ID": self.xNewRelicID, "User-Agent": self.apiUserAgent, "Authorization": "Bearer " + self.token, "X-NIKE-UX-ID": "com.nike.commerce.snkrs.ios" } response = requests.get(url, headers=headers).json() return response def setShippingOptionsId(self): url = "https://api.nike.com/buy/shipping_options/v2" headers = { "Accept": "application/json", "Authorization": "Bearer " + self.token, "Content-Type": "application/json; charset=utf-8", "x-nike-caller-id": self.xNikeCallerId, "User-Agent": self.apiUserAgent, "X-NewRelic-ID": self.xNewRelicID } data = {"items": [{"id": self.shippingId, "shippingAddress": self.addressInfo, "skuId": self.skuId}], "currency": "CNY", "country": "CN"} response = requests.post(url, headers=headers, json=data).json() return response def setCheckoutId(self): url = "https://api.nike.com/buy/checkout_previews/v2/" + self.checkoutId headers = { "Accept": "application/json", "Authorization": "Bearer " + self.token, "Content-Type": "application/json", "x-nike-caller-id": self.xNikeCallerId, "User-Agent": self.apiUserAgent, "X-NewRelic-ID": self.xNewRelicID } data = {"request": {"email": self.email, "clientInfo": {"deviceId": "", "client": "com.nike.commerce.snkrs.ios"}, "currency": "CNY", "items": [{"recipient": self.recipientInfo, "shippingAddress": self.addressInfo, "id": self.shippingId, "quantity": 1, "skuId": self.skuId, "shippingMethod": "GROUND_SERVICE", "contactInfo": self.contactInfo}], "channel": "SNKRS", "locale": "zh_CN", "country": "CN"}} response = requests.put(url, headers=headers, json=data).json() return response def getPriceChecksum(self): url = "https://api.nike.com/buy/checkout_previews/v2/jobs/" + self.checkoutId headers = { "Accept": "application/json", "X-NewRelic-ID": self.xNewRelicID, "x-nike-caller-id": self.xNikeCallerId, "Authorization": "Bearer " + self.token, "User-Agent": self.apiUserAgent } response = requests.get(url, headers=headers).json() totalPrice = response["response"]["totals"]["total"] priceChecksum = response["response"]["priceChecksum"] return totalPrice, priceChecksum def getPaymentToken(self, totalPrice): url = "https://api.nike.com/payment/preview/v2" headers = { "Accept": "application/json; charset=utf-8", "Authorization": "Bearer " + self.token, "Content-Type": "application/json", "x-nike-caller-id": self.xNikeCallerId, "User-Agent": self.apiUserAgent, "X-NewRelic-ID": self.xNewRelicID } data = {"total": totalPrice, "items": [{"productId": self.productId, "shippingAddress": self.addressInfo}], "checkoutId": self.checkoutId, "currency": "CNY", "paymentInfo": [ {"id": self.paymentsId, "type": "Alipay", "billingInfo": {"name": self.recipientInfo, "contactInfo": self.contactInfo, "address": self.addressInfo}}], "country": "CN"} response = requests.post(url, headers=headers, json=data).json() paymentToken = response["id"] return paymentToken def getDataFromResponse(self, data): data = json.loads(data.split("\r\n\r\n")[1]) return data # 发送请求 def sendRequestsToHost(self, data): sock = ssl.wrap_socket(socket.socket()) sock.connect((self.host, 443)) sock.sendall(bytes(data, encoding='utf-8')) # recv_data = sock.recv(10240).decode('utf-8') result = "" while True: try: recv_data = sock.recv(2048) result += recv_data.decode('utf-8') except socket.error as err_msg: print('Error receiving data: %s' % err_msg) if not len(recv_data): break sock.close() return result def sendRequestsToApiHost(self, data): sock = ssl.wrap_socket(socket.socket()) sock.connect((self.apiHost, 443)) sock.sendall(bytes(data, encoding='utf-8')) # recv_data = sock.recv(10240) result = "" while True: try: recv_data = sock.recv(2048) result += recv_data.decode('utf-8') except socket.error as err_msg: print('Error receiving data: %s' % err_msg) if not len(recv_data): break sock.close() return result def launchEntrie(self, paymentToken, priceChecksum): url = "https://api.nike.com/launch/entries/v2" headers = { "Authorization": "Bearer " + self.token, "Content-Type": "application/json", "x-nike-caller-id": self.xNikeCallerId, "User-Agent": self.apiUserAgent, "X-NewRelic-ID": self.xNewRelicID } data = {"deviceId": "", "postpayLink": self.postpayLink, "checkoutId": self.checkoutId, "currency": "CNY", "paymentToken": paymentToken, "shipping": {"recipient": self.launchRecipient, "method": "GROUND_SERVICE", "address": self.launchAddress}, "skuId": self.skuId, "channel": "SNKRS", "launchId": self.launchId, "locale": "zh_CN", "priceChecksum": priceChecksum} response = requests.post(url, headers=headers, json=data).json() return response-----------------
import json import re import requests def getPostpayLink(productId, shoesLink): url = "https://api.nike.com/merch/products/v2/" + productId response = requests.get(url).json() postpayLink = shoesLink + "?LEStyleColor={styleColor}&LEPaymentType=Alipay".format( styleColor=response["styleColor"]) return postpayLink def buySet(shoesUrl): response = requests.get(shoesUrl).text launchId = re.findall(r"\"launchViewId\":\"(.*?)\"", response, re.S | re.I)[0] size = re.findall(r"\"sizes\":(.*?),\"_fetchedAt\"", response, re.S | re.I)[0] productId = re.findall(r"product\":{\"productId\":\"(.*?)\"", response, re.S | re.I)[0] shoesLink = re.findall(r"canonical\" href=\"(.*?)\"", response, re.S | re.I)[0] postpayLink = getPostpayLink(productId, shoesLink) stock = [] outOfStock = [] j_size = json.loads(size) for s in j_size: if j_size[s]["available"]: stock.append(j_size[s]["localizedSize"]) else: outOfStock.append(j_size[s]["localizedSize"]) stock.sort() outOfStock.sort() if len(outOfStock) > 0: print(" ".join(outOfStock) + "尺码缺货\n请选择" + " ".join(stock)) else: print("所有尺码有货\n请选择" + " ".join(stock)) shoesSize = input("-->") for s in j_size: if j_size[s]["localizedSize"] == shoesSize: usShoesSize = s skuId = j_size[s]["skuId"] # print("请设置好默认收货地址") # print("请选择支付方式 1.银联 2.微信 3.支付宝") # purMethod = input("-->") # shoesBuyUrl = shoesUrl + "/?productId=" + launchId + "&size=" + usShoesSize return shoesSize, launchId, skuId, productId, postpayLink
最后用两个进程分别负责抢购和监控抢购
if __name__ == '__main__': shoesSize, launchId, skuId, productId, postpayLink = buySet( "https://www.nike.com/cn/launch/t/air-revaderchi-gym-red-mink-brown/") threads = int(input("根据参与用户数量自行设置一个购买和监控的线程数:")) userInfoList = [] def bulkPurchase(orderQueue): def buy(refreshToken, username, password): try: app = BuySnkrs(refreshToken, skuId, launchId, productId, postpayLink) setShippingOptionsIdResponse = app.setShippingOptionsId() # print(setShippingOptionsIdResponse) setCheckoutIdResponse = app.setCheckoutId() # print(setCheckoutIdResponse) totalPrice, priceChecksum = app.getPriceChecksum() # print(totalPrice, priceChecksum) paymentToken = app.getPaymentToken(totalPrice) # print(paymentToken) launchEntrie = app.launchEntrie(paymentToken, priceChecksum) userInfo = [refreshToken, launchEntrie["id"], username, password] orderQueue.put(userInfo) except: traceback.print_exc() finally: td.release() conn = sqlite3.connect("nike.db") conn.text_factory = str c = conn.cursor() allUser = c.execute("SELECT * from nike").fetchall() td = threading.BoundedSemaphore(threads) threadlist = [] for user in allUser: username = user[2] password = user[3] refreshToken = user[1] td.acquire() t = threading.Thread(target=buy, args=(refreshToken, username, password)) t.start() threadlist.append(t) for x in threadlist: x.join() c.close() def orderStatus(orderQueue): def checkStatus(u): try: token, order, usr, pwd = u url = "https://unite.nikecloud.com/tokenRefresh?backendEnvironment=identity&locale=zh_CN&mobile=true&native=true&uxId=com.nike.commerce.snkrs.ios&sdkVersion=2.8.4&backendEnvironment=identity&platform=ios&browser=uniteSDK" data = {"client_id": "G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj", "grant_type": "refresh_token", "refresh_token": token} r = requests.post(url, json=data).json() accessToken = r["access_token"] headers = {"authorization": "Bearer " + accessToken} response = requests.get("https://api.nike.com/launch/entries/v2/" + order, headers=headers).json() print(usr, response) try: result = response["result"] except: return if result["status"] == "WINNER": print("抢购成功, 帐号:%s 密码:%s (自行登陆手机app可查看订单)" % (usr, pwd)) userInfoList.remove(u) elif result["status"] == "NON_WINNER": print("%s 抢购失败" % usr) userInfoList.remove(u) except: traceback.print_exc() finally: td.release() while True: if not orderQueue.empty(): userInfo = orderQueue.get(True) userInfoList.append(userInfo) else: if len(userInfoList) != 0: td = threading.BoundedSemaphore(threads) threadlist = [] for u in userInfoList: td.acquire() t = threading.Thread(target=checkStatus, args=(u,)) t.start() threadlist.append(t) for x in threadlist: x.join() else: time.sleep(2) orderQueue = Queue() buy = Process(target=bulkPurchase, args=(orderQueue,)) status = Process(target=orderStatus, args=(orderQueue,)) buy.start() status.start()瞎写着玩的,只做简单记录。
评论:
2018-10-25 14:30
如果对https流量解密的话,pc和手机端都会在登录阶段出问题,抓到的登录响应是403;而如果不进行流量解密,登录是可以正常进行的。
///所以你在抓包的时候也碰见了这样的情况吗?。
2018-10-07 10:25
mitmproxy?
还是修改 chromeDriver 的 $cdc 变量重新编译呢?
很想和你讨论一下这个技术方案,我的邮箱是 [email protected],期待你的回复! Thanks
2018-09-13 22:57
求一份源文件....复现不来啊...
通过第一段代码和第二段代码
print(LoginInit('888888','888888').getLoginRequests())
通过这个和
WebLogin里面的cookies
拿到了两组数据
但是都没看到userid和refreshtoken
所以...求一份源码 [email protected] or [email protected]
你是衣冠楚楚的人 而我只是一个打满补丁的猴子
-
小博客一个,没必要伤害她
热门文章
存档
标签
最新评论
- yz
想想你喜欢什么,想做什么,找好一个自己的... - 小屿
@Jahan:testfun1024#p... - Jahan
Hello dear Xia0 i a... - brave
@万:你的手机应该是anroid7.0以... - jhsy
新版的cookie机制应该又变了. 而且... - 小屿
@janto:无兴趣 - janto
新版的这些好像不起作用了,deviceI... - hunk
正在研究,可否发一份新源码?todz$1... - miffy
请问可以加个好友咨询下吗? - vegetableChicken
@Snkrs:我也遇到和你一样的问题了,...
2018-11-19 07:04