When you read an execution plan you’re probably trying to identify the steps that Oracle went through to acquire the final result set so that you can decide whether or not there is a more efficient way of getting the same result.
For a serial execution plan this typically means you have to identify the join order, join methods and access methods together with the point at which each predicate was applied. Generally this isn’t too difficult, although subquery pushing (and a couple of presentation bugs relating to other uses of scalar subqueries) can cause a little confusion; and the difference between join order and the order of operation can be slightly obscured when considering hash joins.
Parallel execution plans are harder, though, because you really need to understand the impact of the order of operation, distribution mechanisms chosen, and (in recent versions of Oracle) the timing of the generation and use of Bloom filters. The topic is stunningly large and easy to describe badly; it’s also going to be easy to make generalisations that turn out to be untrue or (at least) sufficiently incomplete as to be misleading. Rather than attempting to cover the topic in one note, I think I’m going to end up writing two or three.
I’ll be basing my comments on the two execution plans I produced for a recent question on Bloom filters, so you may want to read that article and its subsequent comments first (the link will open in a new window).
The ideal target for parallel execution is linear scalability. In other words, if you execute a statement with parallel degree N then the workload should be shared N ways with no interference or duplication and the job should complete in one Nth of the time. There are immediately four caveats as far as Oracle implementation is concerned:
- there’s a setup overhead for parallel execution that eats into the time-saving (so we shouldn’t bother using parallel execution on queries that are only going to take a couple of seconds anyway).
- in the case of a parallel query all the data has to pass to the front-end through the query co-ordinator, so there’s an implicit bottleneck that can conceal a huge fraction of the time-saving
- a statement running at degree N is typically using 2*N parallel execution slaves, so you might wonder whether you deserve a “2*N” improvement in response time, rather than just N
- if you’re running on the Exadata database machine, a very large I/O requirement can be split across all the storage servers concurrently, so even a serial query on a full rack can, in effect, get pretty close to executing parallel degree 14 (provided you don’t want to do very much with the data once you’ve ripped it from the storage servers).
Having mentioned the caveats, I’m going to ignore them and stick to a simplified image of two sets of parallel query slaves engaged in some fairly simple query processing and ask how Oracle shares the work while avoiding contention and duplicated effort.
Key to doing “1/Nth” of the work is the attempt to get a PX slave to do 1/Nth of the necessary disc I/O – so for a tablescan the query co-ordinator (QC) cuts the table into N pieces (technically it actually uses N * 13 pieces so that it can share N ways while allowing for an imbalance in content – the 13 comes from a historic 9:3:1 strategy); for access to partitioned tables the query co-ordinator may share out the partitions. In either case the co-ordinator can arrange to share the disk I/O fairly without collision or duplication of effort. That’s a basic first step.
But if we have to join table X and table Y, how do we ensure that 1/Nth of table Y that a slave reads is the bit associated with the 1/Nth of table X that it has read so far ? The answer is that we can’t, and that’s why distribution mechanisms are important and why (for simplicity, probably) Oracle generally ends up using 2 sets of N parallel execution slaves when running with degree N. One set of parallel execution slaves reads the first table as efficiently as possible and then (typically) distributes the data to the other set of parallel execution slaves using a mechanism that tries to anticipate the most effective strategy for reading the second set of data.
So, for example, if the optimizer estimates that Table X is 100MB but we’re only going to find 100 relevant rows in it while table Y will return 100,000 rows in 10GB of data the plan is likely to be:
- Slave set 1 – scan table X and copy every relevant row you find to every slave in slave set 2 (broadcast)
- Slave set 2 – build an in-memory hash table with the rowsource you get from slave set 1, then scan table Y and probe your hash table
We generate the right answer with this “(broadcast, none)” distribution because every relevant row from table X is known to every slave in the second slave set, so there is no risk of this distribution method “losing” a join between a row in table X and table Y that should have happened. The plan (probably) shows good scalability – i.e. time to complete is inversely proportional to degree N – because our two tables are both split into N separate areas with N slaves scanning them concurrently. The performance threat comes from the cost of duplicating the 100 rows to each of the slaves in the second set – which is probably a small overhead in CPU and memory, with a little contention due to the concurrent “cross-chat” between the levels of slaves (every slave in set 1 spends a little time talking – or wanting to talk – to every slave in set 2).
Alternatively, if the optimizer estimates that Table X will return 100,000 rows and table Y will return 1,000,000, then it may decide that there is too much data coming from table X to follow the broadcast strategy – which would end up with every slave in the second set receiving, and building a hash table from 100,000 rows. In this case we may see:
- Slave set 1 – scan table X, apply a “randomising” hash function (such as mod(join_column, N)) to the join columns and use the result to share the data evenly (we hope) to the second set of slaves (hash distribution)
- Slave set 2 – build a hash table in memory from the data it receives from slave set 1
- Slave set 1 – scan table Y, apply the same hash function to the join columns and again use the result to share the data out to the second set of slaves
- Slave set 2 – receive the incoming data, use it to probe the local in-memory hash table
In this case we know we will get the right answer because if two rows are supposed to join then the nature of the “(hash, hash)” distribution means that both rows will have been sent to the same parallel execution slave in slave set 2. Note that in this case Slave set 1 has scanned both the tables – but we still only scan the tables once each, and each scan is shared evenly between N non-interfering slaves. The performance trade-off is about memory, CPU and (particularly) inter-process contention.
On the plus side, we have minimised the memory usage by sharing the data that goes into the hash table across N slaves, rather than having every slave build the whole hash table; we have minimised the (first pass of) messaging by sending each row from each slave in the first set to just one slave in the second set, rather than sending every row to every slave.
On the minus side, the first set of slaves has had to pass a large number of rows from table Y to the second set of slaves – which increases the CPU usage, and increases the scope for contention in the cross-chat between the two sets of slaves. This is where the Bloom filter can come into play. If, for example, the optimizer thinks table X will return 100,000 rows and table Y will (based on the non-join predicates) return 1M rows, but the join will eliminate 90% of those rows, it may direct the second set of slaves to create a (small) Bloom filter from the values each has received and pass this back to the first set of slaves which can then use the filter to eliminate a lot of the rows from table Y that would otherwise be passed to the second set of slaves. We do not expect Bloom filters to be perfect – they will eliminate only rows that will not be needed, but they may not manage to eliminate all of those rows – so the data that reaches the second set of slaves may still contain some rows that will disappear on the join, but it will certainly contain all the rows that should have survived.
That completes my brief outline of the absolute basics of the interaction of parallel execution slaves and what they are trying to achieve. In the next article I’ll talk about “table queues” and the order of operation. (Comment 7 below lists the rest of the installments in this series.)