NIKE SNKRS 抢购流程分析与自动化抢购实现

2018-8-5 小屿 Python

本来是打算帮朋友写一个小程序,据说在网上找了很多人都实现不了nike的自动化抢购的功能,不过后来发现是拿去搞批量购买牟利有点灰产的意思,所以拒绝了继续开发,本来想着核心都实现了他们要搞自己搞吧,然而可笑的是我拒绝后还被这个“朋友”的所谓的“合伙人”嘲讽了一番,呵呵,已拉黑。


实际上nike的开发者也在打击程序自动化抢购,已尝试给他们反馈了相关问题。


nike出的新品鞋会发布在:https://www.nike.com/cn/launch/?s=upcoming ,虽然售价上千却供不应求,反正我是买不起,而抢购时间段通常在早上9点。


抓包分析后发现确实比较麻烦,因为抢购的时间过短,而且请求较多不易分析,某些请求还有访问频率的限制,一些参数是通过js来的,js被混淆后可读性差,而通过真实浏览器动态解析效率较低,登陆过多还会出现登陆异常,可能是有一些人机验证的规则,出现登陆异常发现只要在旁边乱点几下就可以正常登陆了。

1.jpg


最终通过分析app的请求实现了自动化抢购,相较于web的请求app请求更加直观,且反编译apk后有一定可读性。


首先要实现的是登陆,仅仅通过帐号密码无法登陆,还需要验证cookies中的bm_sz和_abck,尝试了通过请求某些功能来实现set-cookie这两个值但是set的值不能登陆。只能通过web的成功登陆后的set-cookie用于app才可以登陆。

而这个时候发现了tokenRefresh请求,用来保持app上的帐号登陆,当登陆的token失效后通过tokenRefresh来刷新,而这个功能没有任何请求的限制,所以帐号只需要登陆一次就可以通过tokenRefresh保持登陆一直使用。


所以就有了在web通过google headless批量登陆获取bm_sz和_abck值,然后模拟手机登陆请求获取refresh_token,就可以永久的使用这个账号来执行相关操作了。参考代码如下:

import json
import random
import traceback

import requests
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.support.wait import WebDriverWait


class WebLogin_Chrome:
    def __init__(self, username, password):
        self.cookies = None
        self.session = requests.session()
        self.userInfo = None

        self.username = username
        self.password = password

    def login(self):
        try:
            chromeOptions = webdriver.ChromeOptions()
            # 无界面
            # chromeOptions.add_argument('--headless')
            # 禁用gpu加速
            chromeOptions.add_argument('--disable-gpu')
            # 关闭图片
            prefs = {"profile.managed_default_content_settings.images": 2}
            chromeOptions.add_experimental_option("prefs", prefs)

            driver = webdriver.Chrome(chrome_options=chromeOptions)

            w_h = driver.get_window_size()
            width = w_h["width"]
            height = w_h["height"]

            clickWidth1 = (width - 500) / 2
            clickWidth2 = width / 2 + 250

            driver.get("https://www.nike.com/cn/zh_cn/")
            WebDriverWait(driver, 15).until(lambda x: x.find_element_by_class_name('login-text'))
            driver.find_element_by_class_name('login-text').click()

            # 随机位置点击绕过验证
            for i in range(random.randint(2, 5)):
                ActionChains(driver).move_by_offset(clickWidth1,
                                                    random.randint(0, height)).click().perform()
                ActionChains(driver).move_by_offset(clickWidth2,
                                                    random.randint(0, height)).click().perform()

            driver.find_element_by_name('verifyMobileNumber').send_keys(self.username)
            driver.find_element_by_name('password').send_keys(self.password)
            driver.find_element_by_class_name('nike-unite-submit-button').click()

            # 随机位置点击绕过验证
            for i in range(random.randint(2, 5)):
                ActionChains(driver).move_by_offset(clickWidth1,
                                                    random.randint(0, height)).click().perform()
                ActionChains(driver).move_by_offset(clickWidth2,
                                                    random.randint(0, height)).click().perform()
            try:
                WebDriverWait(driver, 5).until_not(
                    lambda x: x.find_element_by_class_name('exp-join-login').is_displayed())
            except:
                # print("等待超时...")
                pass
            if not driver.find_element_by_xpath('//*[@id="nike-unite-mobileLoginForm"]/div[1]').is_displayed():
                WebDriverWait(driver, 10).until_not(
                    lambda x: x.find_element_by_class_name('exp-join-login').is_displayed())

                self.cookies = driver.get_cookies()

                driver.get("https://unite.nike.com/session.html")
                userInfo = driver.execute_script(
                    "return localStorage.getItem('com.nike.commerce.nikedotcom.web.credential');")
                self.userInfo = json.loads(userInfo)
        except:
            traceback.print_exc()
        finally:
            driver.close()
            driver.quit()

    def getCookies(self):
        cookies = ""
        if self.cookies != None:
            for cookie in self.cookies:
                cookies += (cookie['name'] + "=" + cookie['value'] + ";")
        return cookies
-----------------
import json
import socket
import ssl
import traceback
import uuid

from WebLogin import WebLogin_Chrome


class LoginInit:
    def __init__(self, username, password):
        self.username = "+86" + username
        self.password = password

        self.host = "s3.nikecdn.com"
        self.visitorId = str(uuid.uuid4())
        self.clientId = "G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj"
        self.userAgent = "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15F79"

        webLogin = WebLogin_Chrome(username, password)
        webLogin.login()
        self.cookies = webLogin.getCookies()

    # 获取app登陆请求
    def getLoginRequests(self):
        payload = '{{"username":"{usr}","password":"{pwd}","client_id":"{clientId}","ux_id":"com.nike.commerce.snkrs.ios","grant_type":"password"}}'.format(
            usr=self.username, pwd=self.password, clientId=self.clientId)

        head = '''POST /login?appVersion=454&experienceVersion=375&uxid=com.nike.commerce.snkrs.ios&locale=zh_CN&backendEnvironment=identity&browser=Apple%20Computer%2C%20Inc.&os=undefined&mobile=true&native=true&visit=1&visitor={visitorId} HTTP/1.1
Host: {host}
Content-Type: application/json
Origin: https://{host}
Cookie: {cookies}
Content-Length: {length}
Connection: close
Accept: */*
User-Agent: {userAgent}
Referer: https://s3.nikecdn.com/unite/mobile.html?mid=66794190406425515927935901233201301138?iOSSDKVersion=2.8.4&clientId=G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj&uxId=com.nike.commerce.snkrs.ios&view=none&locale=zh_CN&backendEnvironment=identity
Accept-Language: zh-cn'''.format(visitorId=self.visitorId, host=self.host, userAgent=self.userAgent,
                                 length=len(payload), cookies=self.cookies)

        data = head + "\r\n\r\n" + payload
        return data

    def getDataFromResponse(self, data):
        data = json.loads(data.split("\r\n\r\n")[1])
        return data

    # 发送请求
    def sendRequestsToHost(self, data):
        sock = ssl.wrap_socket(socket.socket())
        sock.connect((self.host, 443))

        sock.sendall(bytes(data, encoding='utf-8'))
        # recv_data = sock.recv(10240).decode('utf-8')
        result = ""
        while True:
            try:
                recv_data = sock.recv(2048)
                result += recv_data.decode('utf-8')
            except socket.error as err_msg:
                print('Error receiving data: %s' % err_msg)
            if not len(recv_data):
                break
        sock.close()
        return result


实现将帐号app登陆后的refresh_token存入数据库备用

2.png


有了永久保持登陆的账号后继续分析抢购的请求,发现抢购最终是post请求到https://api.nike.com/launch/entries/v2,但是其中多个值是由其他请求进行设置或者获取,具体分析见代码

class BuySnkrs:
    def __init__(self, refreshToken, skuId, launchId, productId, postpayLink):
        self.refreshToken = refreshToken

        self.skuId = skuId
        self.launchId = launchId
        self.productId = productId
        self.postpayLink = postpayLink

        self.host = "s3.nikecdn.com"
        self.apiHost = "api.nike.com"
        self.visitorId = str(uuid.uuid4())
        self.clientId = "G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj"
        self.userAgent = "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15F79"

        self.apiUserAgent = "SNKRS/3.3.3 (iPhone; iOS 11.4; Scale/2.00)"
        self.xNewRelicID = "VQYGVF5SCBADUVBRBgAGVg=="
        self.xNikeCallerId = "nike:snkrs:ios:3.3"

        self.checkoutId = str(uuid.uuid4())
        self.shippingId = str(uuid.uuid4())
        self.paymentsId = str(uuid.uuid4())

        self.times = self.getUtcTime()

        self.token = self.getTokenRefresh()

        # 格式化一些购买过程中所需的参数
        self.userCommerce = self.getUserCommerce()
        self.email = self.userCommerce["emails"]["primary"]["email"]

        # print(self.userCommerce["address"])
        # for i in self.userCommerce["address"]:
        #     address = self.userCommerce["address"][i]
        #     break
        address = self.userCommerce["address"]["shipping"]
        state = address["province"]
        city = address["locality"]
        county = address["zone"]
        address1 = address["line1"]
        try:
            address2 = address["line2"]
        except:
            address2 = " "
        postalCode = address["code"]
        country = address["country"]
        self.addressInfo = {"state": state, "city": city, "address1": address1, "postalCode": postalCode,
                            "address2": address2, "county": county, "country": country}

        name = address["name"]["primary"]
        lastName = name["given"]
        firstName = name["family"]
        self.recipientInfo = {"lastName": lastName, "firstName": firstName}

        phone = address["phone"]["primary"]
        self.contactInfo = {"phoneNumber": phone, "email": self.email}

        # 格式化最终抢购所需的参数
        self.launchRecipient = {"lastName": lastName, "firstName": firstName, "email": self.email,
                                "phoneNumber": phone}

        self.launchAddress = {"state": state, "city": city, "address1": address1, "county": county, "country": "CN"}

    # 获取服务器时区的时间
    def getUtcTime(self):
        utcTime = time.gmtime()
        return utcTime

    # 获取最新发布的产品
    def gwtProductFeed(self):
        response = requests.get(
            "https://api.nike.com/commerce/productfeed/products/snkrs/threads?country=CN&limit=5&locale=zh_CN&skip=0&withCards=true").json()
        return response

    # 获取新的通知信息
    def getNotifications(self):
        times = time.strftime("%Y-%m-%dT%H%%3A%M%%3A%S.000%%2B0000", self.times)
        url = "https://api.nike.com/plus/v3/notifications/me/stored?since={time}&limit=10&locale=zh-Hans_CN".format(
            time=times)
        headers = {
            "deliveryId": "com.nike.onenikecommerce",
            "appid": "com.nike.commerce.snkrs.ios",
            "Authorization": "Bearer " + self.token,
            "User-Agent": self.apiUserAgent,
            "X-NewRelic-ID": self.xNewRelicID
        }
        response = requests.get(url, headers=headers).json()
        return response

    # 获取订单信息
    def getOrderHistory(self):
        url = "https://api.nike.com/commerce/ap/orderhistory?action=getOrderHistoryList&country=CN"
        headers = {
            "X-NewRelic-ID": self.xNewRelicID,
            "User-Agent": self.apiUserAgent,
            "Authorization": "Bearer " + self.token
        }
        response = requests.get(url, headers=headers).text
        return response

    # 刷新登陆凭证
    def getTokenRefresh(self):
        url = "https://unite.nikecloud.com/tokenRefresh?backendEnvironment=identity&locale=zh_CN&mobile=true&native=true&uxId=com.nike.commerce.snkrs.ios&sdkVersion=2.8.4&backendEnvironment=identity&platform=ios&browser=uniteSDK"
        data = {"client_id": self.clientId, "grant_type": "refresh_token",
                "refresh_token": self.refreshToken}
        r = requests.post(url, json=data).json()
        return r["access_token"]

    # 获取用户基本信息
    def getUserCommerce(self):
        url = "https://api.nike.com/user/commerce"
        headers = {
            "X-NewRelic-ID": self.xNewRelicID,
            "User-Agent": self.apiUserAgent,
            "Authorization": "Bearer " + self.token,
            "X-NIKE-UX-ID": "com.nike.commerce.snkrs.ios"
        }
        response = requests.get(url, headers=headers).json()
        return response

    def setShippingOptionsId(self):
        url = "https://api.nike.com/buy/shipping_options/v2"
        headers = {
            "Accept": "application/json",
            "Authorization": "Bearer " + self.token,
            "Content-Type": "application/json; charset=utf-8",
            "x-nike-caller-id": self.xNikeCallerId,
            "User-Agent": self.apiUserAgent,
            "X-NewRelic-ID": self.xNewRelicID
        }
        data = {"items": [{"id": self.shippingId, "shippingAddress": self.addressInfo, "skuId": self.skuId}],
                "currency": "CNY", "country": "CN"}
        response = requests.post(url, headers=headers, json=data).json()
        return response

    def setCheckoutId(self):
        url = "https://api.nike.com/buy/checkout_previews/v2/" + self.checkoutId
        headers = {
            "Accept": "application/json",
            "Authorization": "Bearer " + self.token,
            "Content-Type": "application/json",
            "x-nike-caller-id": self.xNikeCallerId,
            "User-Agent": self.apiUserAgent,
            "X-NewRelic-ID": self.xNewRelicID
        }
        data = {"request": {"email": self.email,
                            "clientInfo": {"deviceId": "", "client": "com.nike.commerce.snkrs.ios"}, "currency": "CNY",
                            "items": [{"recipient": self.recipientInfo,
                                       "shippingAddress": self.addressInfo,
                                       "id": self.shippingId, "quantity": 1,
                                       "skuId": self.skuId,
                                       "shippingMethod": "GROUND_SERVICE",
                                       "contactInfo": self.contactInfo}],
                            "channel": "SNKRS", "locale": "zh_CN", "country": "CN"}}
        response = requests.put(url, headers=headers, json=data).json()
        return response

    def getPriceChecksum(self):
        url = "https://api.nike.com/buy/checkout_previews/v2/jobs/" + self.checkoutId
        headers = {
            "Accept": "application/json",
            "X-NewRelic-ID": self.xNewRelicID,
            "x-nike-caller-id": self.xNikeCallerId,
            "Authorization": "Bearer " + self.token,
            "User-Agent": self.apiUserAgent
        }
        response = requests.get(url, headers=headers).json()
        totalPrice = response["response"]["totals"]["total"]
        priceChecksum = response["response"]["priceChecksum"]
        return totalPrice, priceChecksum

    def getPaymentToken(self, totalPrice):
        url = "https://api.nike.com/payment/preview/v2"
        headers = {
            "Accept": "application/json; charset=utf-8",
            "Authorization": "Bearer " + self.token,
            "Content-Type": "application/json",
            "x-nike-caller-id": self.xNikeCallerId,
            "User-Agent": self.apiUserAgent,
            "X-NewRelic-ID": self.xNewRelicID
        }
        data = {"total": totalPrice, "items": [{"productId": self.productId,
                                                "shippingAddress": self.addressInfo}],
                "checkoutId": self.checkoutId, "currency": "CNY", "paymentInfo": [
                {"id": self.paymentsId, "type": "Alipay",
                 "billingInfo": {"name": self.recipientInfo,
                                 "contactInfo": self.contactInfo,
                                 "address": self.addressInfo}}], "country": "CN"}
        response = requests.post(url, headers=headers, json=data).json()
        paymentToken = response["id"]
        return paymentToken

    def getDataFromResponse(self, data):
        data = json.loads(data.split("\r\n\r\n")[1])
        return data

    # 发送请求
    def sendRequestsToHost(self, data):
        sock = ssl.wrap_socket(socket.socket())
        sock.connect((self.host, 443))

        sock.sendall(bytes(data, encoding='utf-8'))
        # recv_data = sock.recv(10240).decode('utf-8')
        result = ""
        while True:
            try:
                recv_data = sock.recv(2048)
                result += recv_data.decode('utf-8')
            except socket.error as err_msg:
                print('Error receiving data: %s' % err_msg)
            if not len(recv_data):
                break
        sock.close()
        return result

    def sendRequestsToApiHost(self, data):
        sock = ssl.wrap_socket(socket.socket())
        sock.connect((self.apiHost, 443))

        sock.sendall(bytes(data, encoding='utf-8'))
        # recv_data = sock.recv(10240)
        result = ""
        while True:
            try:
                recv_data = sock.recv(2048)
                result += recv_data.decode('utf-8')
            except socket.error as err_msg:
                print('Error receiving data: %s' % err_msg)
            if not len(recv_data):
                break
        sock.close()
        return result

    def launchEntrie(self, paymentToken, priceChecksum):
        url = "https://api.nike.com/launch/entries/v2"
        headers = {
            "Authorization": "Bearer " + self.token,
            "Content-Type": "application/json",
            "x-nike-caller-id": self.xNikeCallerId,
            "User-Agent": self.apiUserAgent,
            "X-NewRelic-ID": self.xNewRelicID
        }
        data = {"deviceId": "", "postpayLink": self.postpayLink, "checkoutId": self.checkoutId, "currency": "CNY",
                "paymentToken": paymentToken,
                "shipping": {"recipient": self.launchRecipient, "method": "GROUND_SERVICE",
                             "address": self.launchAddress}, "skuId": self.skuId, "channel": "SNKRS",
                "launchId": self.launchId, "locale": "zh_CN", "priceChecksum": priceChecksum}
        response = requests.post(url, headers=headers, json=data).json()
        return response
-----------------
import json
import re

import requests


def getPostpayLink(productId, shoesLink):
    url = "https://api.nike.com/merch/products/v2/" + productId
    response = requests.get(url).json()
    postpayLink = shoesLink + "?LEStyleColor={styleColor}&LEPaymentType=Alipay".format(
        styleColor=response["styleColor"])
    return postpayLink


def buySet(shoesUrl):
    response = requests.get(shoesUrl).text

    launchId = re.findall(r"\"launchViewId\":\"(.*?)\"", response, re.S | re.I)[0]
    size = re.findall(r"\"sizes\":(.*?),\"_fetchedAt\"", response, re.S | re.I)[0]
    productId = re.findall(r"product\":{\"productId\":\"(.*?)\"", response, re.S | re.I)[0]

    shoesLink = re.findall(r"canonical\" href=\"(.*?)\"", response, re.S | re.I)[0]
    postpayLink = getPostpayLink(productId, shoesLink)

    stock = []
    outOfStock = []

    j_size = json.loads(size)

    for s in j_size:
        if j_size[s]["available"]:
            stock.append(j_size[s]["localizedSize"])
        else:
            outOfStock.append(j_size[s]["localizedSize"])
    stock.sort()
    outOfStock.sort()

    if len(outOfStock) > 0:
        print(" ".join(outOfStock) + "尺码缺货\n请选择" + " ".join(stock))
    else:
        print("所有尺码有货\n请选择" + " ".join(stock))

    shoesSize = input("-->")

    for s in j_size:
        if j_size[s]["localizedSize"] == shoesSize:
            usShoesSize = s
            skuId = j_size[s]["skuId"]

    # print("请设置好默认收货地址")

    # print("请选择支付方式 1.银联 2.微信 3.支付宝")
    # purMethod = input("-->")

    # shoesBuyUrl = shoesUrl + "/?productId=" + launchId + "&size=" + usShoesSize

    return shoesSize, launchId, skuId, productId, postpayLink

最后用两个进程分别负责抢购和监控抢购

if __name__ == '__main__':
    shoesSize, launchId, skuId, productId, postpayLink = buySet(
        "https://www.nike.com/cn/launch/t/air-revaderchi-gym-red-mink-brown/")

    threads = int(input("根据参与用户数量自行设置一个购买和监控的线程数:"))

    userInfoList = []


    def bulkPurchase(orderQueue):
        def buy(refreshToken, username, password):
            try:
                app = BuySnkrs(refreshToken, skuId, launchId, productId, postpayLink)

                setShippingOptionsIdResponse = app.setShippingOptionsId()
                # print(setShippingOptionsIdResponse)

                setCheckoutIdResponse = app.setCheckoutId()
                # print(setCheckoutIdResponse)

                totalPrice, priceChecksum = app.getPriceChecksum()
                # print(totalPrice, priceChecksum)

                paymentToken = app.getPaymentToken(totalPrice)
                # print(paymentToken)

                launchEntrie = app.launchEntrie(paymentToken, priceChecksum)

                userInfo = [refreshToken, launchEntrie["id"], username, password]
                orderQueue.put(userInfo)
            except:
                traceback.print_exc()
            finally:
                td.release()

        conn = sqlite3.connect("nike.db")
        conn.text_factory = str
        c = conn.cursor()
        allUser = c.execute("SELECT * from nike").fetchall()

        td = threading.BoundedSemaphore(threads)
        threadlist = []
        for user in allUser:
            username = user[2]
            password = user[3]
            refreshToken = user[1]

            td.acquire()
            t = threading.Thread(target=buy, args=(refreshToken, username, password))
            t.start()
            threadlist.append(t)
        for x in threadlist:
            x.join()

        c.close()


    def orderStatus(orderQueue):
        def checkStatus(u):
            try:
                token, order, usr, pwd = u
                url = "https://unite.nikecloud.com/tokenRefresh?backendEnvironment=identity&locale=zh_CN&mobile=true&native=true&uxId=com.nike.commerce.snkrs.ios&sdkVersion=2.8.4&backendEnvironment=identity&platform=ios&browser=uniteSDK"
                data = {"client_id": "G64vA0b95ZruUtGk1K0FkAgaO3Ch30sj", "grant_type": "refresh_token",
                        "refresh_token": token}
                r = requests.post(url, json=data).json()
                accessToken = r["access_token"]
                headers = {"authorization": "Bearer " + accessToken}
                response = requests.get("https://api.nike.com/launch/entries/v2/" + order, headers=headers).json()
                print(usr, response)
                try:
                    result = response["result"]
                except:
                    return
                if result["status"] == "WINNER":
                    print("抢购成功, 帐号:%s 密码:%s (自行登陆手机app可查看订单)" % (usr, pwd))
                    userInfoList.remove(u)
                elif result["status"] == "NON_WINNER":
                    print("%s 抢购失败" % usr)
                    userInfoList.remove(u)
            except:
                traceback.print_exc()
            finally:
                td.release()

        while True:
            if not orderQueue.empty():
                userInfo = orderQueue.get(True)
                userInfoList.append(userInfo)
            else:
                if len(userInfoList) != 0:
                    td = threading.BoundedSemaphore(threads)
                    threadlist = []
                    for u in userInfoList:
                        td.acquire()
                        t = threading.Thread(target=checkStatus, args=(u,))
                        t.start()
                        threadlist.append(t)
                    for x in threadlist:
                        x.join()
                else:
                    time.sleep(2)


    orderQueue = Queue()
    buy = Process(target=bulkPurchase, args=(orderQueue,))
    status = Process(target=orderStatus, args=(orderQueue,))
    buy.start()
    status.start()
瞎写着玩的,只做简单记录。

评论:

cha0scat
2018-09-13 22:57
看到了好几个没被用到的、重复定义但是内容一样的函数...
求一份源文件....复现不来啊...
通过第一段代码和第二段代码
print(LoginInit('888888','888888').getLoginRequests())
通过这个和
WebLogin里面的cookies
拿到了两组数据
但是都没看到userid和refreshtoken

所以...求一份源码 [email protected] or [email protected]

发表评论:

Powered by xia0yu