grpc执行–go_out报错

执行命令:

protoc --go_out=./   ./a.proto

报错:

ERROR: 2020/10/21 11:54:10 [profiling] error parsing flags: when -address isn't specified, you must include -stream-stats-catapult-json
--go_out: protoc-gen-go: Plugin failed with status code 1.

解决:(原因:找不到protoc-gen-go可执行文件路径)

protoc --plugin=protoc-gen-go=../../bin/protoc-gen-go.exe --go_out=plugins=grpc:./ ./helloworld/helloworld.p
roto

简单实现grpc通信

仿照别人代码实现

1.新建python项目,创建文件夹

编辑helloworld.proto文件

syntax = "proto3";

package rpc_package;

// define a service
service HelloWorldService {
    // define the interface and data type
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// define the data type of request
message HelloRequest {
    string name = 1;
}

// define the data type of response
message HelloReply {
    string message = 1;
}

执行,生成rpc_package文件夹下的两个文件

python -m grpc_tools.protoc -I=./protos --python_out=./rpc_package
 --grpc_python_out=./rpc_package ./protos/helloworld.proto
2.安装模块
python -m pip install grpcio 
python -m pip install grpcio-tools
3.编辑server.py和lient.py文件

hello_server.py

#!/usr/bin/env python
# -*-coding: utf-8 -*-

from concurrent import futures
import grpc
import logging
import time

from rpc_package.helloworld_pb2_grpc import add_HelloWorldServiceServicer_to_server, \
    HelloWorldServiceServicer
from rpc_package.helloworld_pb2 import HelloRequest, HelloReply


class Hello(HelloWorldServiceServicer):

    # 这里实现我们定义的接口
    def SayHello(self, request, context):
        return HelloReply(message='Hello, %s!' % request.name)


def serve():
    # 这里通过thread pool来并发处理server的任务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    # 将对应的任务处理函数添加到rpc server中
    add_HelloWorldServiceServicer_to_server(Hello(), server)

    # 这里使用的非安全接口,世界gRPC支持TLS/SSL安全连接,以及各种鉴权机制
    server.add_insecure_port('[::]:50000')
    server.start()
    try:
        while True:
            time.sleep(60 * 60 * 24)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == "__main__":
    logging.basicConfig()
    serve()

编辑hello_client.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function
import logging

import grpc
from rpc_package.helloworld_pb2 import HelloRequest, HelloReply
from rpc_package.helloworld_pb2_grpc import HelloWorldServiceStub

def run():
    # 使用with语法保证channel自动close
    with grpc.insecure_channel('localhost:50000') as channel:
        # 客户端通过stub来实现rpc通信
        stub = HelloWorldServiceStub(channel)

        # 客户端必须使用定义好的类型,这里是HelloRequest类型
        response = stub.SayHello(HelloRequest(name='eric'))
    print ("hello client received: " + response.message)

if __name__ == "__main__":
    logging.basicConfig()
    run()
4.执行

窗口1,先执行python hello_server.py

窗口2,执行 python hello_client.py

输出  hello client received: Hello, eric!   成功

mysql表range分区

按天分区
CREATE TABLE `day` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `create_time` DATETIME NOT NULL COMMENT '入库时间',
  PRIMARY KEY (`id`,`create_time`),
) ENGINE=INNODB AUTO_INCREMENT=4190 DEFAULT CHARSET=utf8 COMMENT='网点系统接口请求日志表'
/*!50500 PARTITION BY RANGE  COLUMNS(create_time)
(PARTITION p20200904 VALUES LESS THAN ('2020-09-05') ENGINE = InnoDB,
 PARTITION p20200905 VALUES LESS THAN ('2020-09-06') ENGINE = InnoDB,
 PARTITION p20200906 VALUES LESS THAN ('2020-09-07') ENGINE = InnoDB,
 PARTITION p20200907 VALUES LESS THAN ('2020-09-08') ENGINE = InnoDB,
 PARTITION pmax VALUES LESS THAN (MAXVALUE) ENGINE = InnoDB) */

继续阅读“mysql表range分区”

php,go,go协程实现斐波那契数列用时比较长度50.

php:50长度时请求超时,长度40时用时在一分钟

//实现1000斐波那契数列
$start =  microtime();
for($i=0;$i<50;$i++){
    echo getfei($i)."<br>";
}

function getfei($i){
    if ($i<=1){
        $fei = 1;
    }else{
        $fei = getfei($i-1)+getfei($i-2);
    }
    return $fei;
}
echo microtime()-$start;

go递归用时:2m23.9160774s

package main

import (
   "fmt"
   "time"
)

//递归实现斐波那契数列
func main() {
   t := time.Now()
   var i int = 0
   for i = 0; i < 50; i++ {
      fe := fei(i)
      fmt.Printf(" vale is %v\n", fe)
   }
   defer func() {
      cost := time.Since(t)
      fmt.Println("cost=", cost)
   }()

}
func fei(a int) (f int) {

   if a <= 1 {
      f = 1
   } else {
      f = fei(a-1) + fei(a-2)
   }
   return
}

go协程用时:1.9949ms

package main

import (
   "fmt"
   "time"
)

func feibonacci(ch chan<- int, quit <-chan bool) {
   x, y := 1, 1
   for {
      select {
      case ch <- x:
         x, y = y, x+y
      case flag := <-quit:
         fmt.Println("flag =", flag)
         return
      }

   }
}
func main() {
   t := time.Now()
   ch := make(chan int)
   quit := make(chan bool)
   go func() {
      for i := 0; i < 50; i++ {
         num := <-ch
         fmt.Println(num)
      }
      quit <- true
   }()
   feibonacci(ch, quit)
   defer func() {
      cost := time.Since(t)
      fmt.Println("cost=", cost)
   }()

}

综上,php用时最长,go协程用时最短毫秒级的

golang基础

在函数调用时,像切片(slice)、字典(map)、接口(interface)、通道(channel)这样的引用类型 都是默认使用引用传递(即使没有显示的指出指针)

切片提供了计算容量的函数 cap() 可以测量切片最长可以达到多少:它等于切片的长度 + 数组除切片之外的长度。如果 s 是一个切片, cap(s) 就是从 s[0] 到数组末尾的数组长度。切片的长度永远不会超过它的容量,所以对于 切片 s 来说该不等式永远成立: 0 <= len(s) <= cap(s) 。

s := make([]byte, 8)
fmt.Println(len(s))//5
fmt.Println(cap(s))//5
s = s[2:4]
fmt.Println(len(s))//2
fmt.Println(cap(s))//6

如果想知道当前的内存状态,可以使用:

func main() {
   //自身占用 单位KB
   memStat := new(runtime.MemStats)
   runtime.ReadMemStats(memStat)
   mem := uint64(10)
   mem = memStat.Alloc
   fmt.Printf("%d\n", mem/1024)
}

go问题

执行go fmt或go build命令报错:

Get “https://proxy.golang.org/github.com/gin-gonic/gin/@v/list”: dial tcp 172.217.160.81:443: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond

解决:执行命令:

go env -w GOPROXY=https://goproxy.cn

go安装gin框架

1,把下载好的包放到go安装目录src下
2.代码:test.go  运行>go run test.go
package main

import "github.com/gin-gonic/gin"

func main() {
   r := gin.Default()
   r.GET("/ping", func(c *gin.Context) {
      c.JSON(200, gin.H{
         "message": "pong",
      })
   })
   r.Run() // listen and serve on 0.0.0.0:8080
}
3.浏览器输入http://127.0.0.1:8080/ping

显示:

{"message":"pong"}

linux超线程

超线程是英特尔开发出来的一项技术,使得单个处理器可以象两个逻辑处理器那样运行,这样单个处理器以并行执行线程。

非超线程:cpu线程数=物理CPU个数*每个物理CPU的逻辑核数*1

超线程:cpu线程数=物理CPU个数*每个物理CPU的逻辑核数*2

物理CPU个数:[root@localhost ~]# cat /proc/cpuinfo | grep “physical id” | sort | uniq

每个物理CPU的逻辑核数:[root@localhost ~]# cat /proc/cpuinfo | grep “cores” | uniq

系统整个cpu线程数: cat /proc/cpuinfo | grep “processor” | wc -l

查看内核信息:[root@localhost ~]# cat /proc/cpuinfo | grep “model name” | uniq

根据physical id信息可以判断哪些逻辑核在同一个物理核上,因为同一物理核上的逻辑核的physical id相等;而根据core id信息又可以判断哪两个cpu线程跑在同一个逻辑核上,因为跑在同一逻辑核上的cpu线程的core id相等,但是,由于不同物理核上的逻辑核core id可以相等,所以在进行第二个判断(即哪两个cpu线程跑在同一个逻辑核上)前需要先进行第一个判断(即哪些逻辑核在同一个物理核上)

go环境搭建

安装GO

1.下载安装包go1.15.windows-amd64.msi点击安装到自定义目录

2.配置:新加环境系统变量

PATH  D:\desktoptool\go\bin  //可执行文件目录

GOPATH  D:\desktoptool\gocode //代码目录

GOROOT  D:\desktoptool\go  //安装目录

3.cmd窗口输入下面命令查看:

D:\desktoptool\gocode>go version
go version go1.15 windows/amd64

D:\desktoptool\gocode>go env
set GO111MODULE=
set GOARCH=amd64
set GOBIN=
set GOCACHE=C:\Users\14065\AppData\Local\go-build
set GOENV=C:\Users\14065\AppData\Roaming\go\env
set GOEXE=.exe
set GOFLAGS=
set GOHOSTARCH=amd64
set GOHOSTOS=windows
set GOINSECURE=
set GOMODCACHE=C:\Users\14065\go\pkg\mod
set GONOPROXY=
set GONOSUMDB=
set GOOS=windows
set GOPATH=C:\Users\14065\go
set GOPRIVATE=
set GOPROXY=https://proxy.golang.org,direct
set GOROOT=D:\desktoptool\go
set GOSUMDB=sum.golang.org
set GOTMPDIR=
set GOTOOLDIR=D:\desktoptool\go\pkg\tool\windows_amd64
set GCCGO=gccgo
set AR=ar
set CC=gcc
set CXX=g++
set CGO_ENABLED=1
set GOMOD=
set CGO_CFLAGS=-g -O2
set CGO_CPPFLAGS=
set CGO_CXXFLAGS=-g -O2
set CGO_FFLAGS=-g -O2
set CGO_LDFLAGS=-g -O2
set PKG_CONFIG=pkg-config
set GOGCCFLAGS=-m64 -mthreads -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=C:\Users\14065\AppData\Local\Temp\go-build611408406=/tmp/go-build -gno-record-gcc-switches

安装IDE goland

 

 

 

 

 

kafka分区

首先我们来看下官网的图示,kafka分区的作用个人觉得就是提供一种负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。

如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()close(),通常你只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

		int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  • 这里的topickeykeyBytesvaluevalueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。虽说可以有无数种分区的可能,但比较常见的分区策略也就那么几种,下面我来详细介绍一下。
轮询策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。

在这里插入图片描述

这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

随机策略

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。

在这里插入图片描述

如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:

		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		return ThreadLocalRandom.current().nextInt(partitions.size());

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

按消息key保存策略

也称 Key-ordering 策略。这个可以理解为是自定义的策略之一。

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

在这里插入图片描述

实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:

		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		return Math.abs(key.hashCode()) % partitions.size();

前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息Key保序策略;如果没有指定 Key,则使用轮询策略。

其他分区策略

上面这几种分区策略都是比较基础的策略,除此之外你还能想到哪些有实际用途的分区策略?其实还有一种比较常见的,即所谓的基于地理位置的分区策略。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群

假设有个厂商所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在该厂商考虑在南方找个城市(比如深圳)再创建一个机房;另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在深圳。

假设该厂商计划为每个新注册用户提供一份注册礼品,比如南方的用户注册的可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。

但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。换句话说,送甜豆腐脑的消费者程序只在深圳机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向深圳机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!

此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码:

		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。

切记分区是实现负载均衡以及高吞吐量的关键,所以一定要在生产者这一端就要考虑好合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,从而导致下游数据消费的性能下降的问题